CMS 3D CMS Logo

PoolDBOutputService.cc
Go to the documentation of this file.
10 //
11 #include <vector>
12 #include <memory>
13 #include <cassert>
14 
16 
17 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
18 // of which stream is presently being processed on a given thread during the call of
19 // a module which calls that method.
20 static thread_local int s_streamIndex = -1;
21 
23  Record thisrecord;
24 
25  thisrecord.m_idName = recordPset.getParameter<std::string>("record");
26  thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
27 
28  thisrecord.m_timetype =
29  cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
30 
31  thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
32 
33  thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
34 
35  m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
36 
37  cond::UserLogInfo userloginfo;
38  m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
39 }
40 
42  : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
43  m_currentTimes{},
44  m_session(),
45  m_transactionActive(false),
46  m_dbInitialised(false),
47  m_records(),
48  m_logheaders() {
49  std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
50  m_timetype = cond::time::timeTypeFromName(timetypestr);
51  m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
52  m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
53  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
54  m_connection.setParameters(connectionPset);
55  m_connection.setLogDestination(m_logger);
56  m_connection.configure();
57  std::string connectionString = iConfig.getParameter<std::string>("connect");
58  m_session = m_connection.createSession(connectionString, true);
59  bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
60  if (saveLogsOnDb)
61  m_logger.setDbDestination(connectionString);
62  // implicit start
63  //doStartTransaction();
64  typedef std::vector<edm::ParameterSet> Parameters;
65  Parameters toPut = iConfig.getParameter<Parameters>("toPut");
66  for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
67  fillRecord(*itToPut, timetypestr);
68 
69  iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
70  iAR.watchPreallocate(
71  [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
72  if (m_timetype == cond::timestamp) { //timestamp
74  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
75  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
76  } else if (m_timetype == cond::runnumber) { //runnumber
77  //NOTE: this assumes only one run is being processed at a time.
78  // This is true for 7_1_X but plan are to allow multiple in flight at a time
79  s_streamIndex = 0;
80  iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
81  } else if (m_timetype == cond::lumiid) {
82  //NOTE: this assumes only one lumi is being processed at a time.
83  // This is true for 7_1_X but plan are to allow multiple in flight at a time
84  s_streamIndex = 0;
85  iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
86  }
87 }
88 
90  const std::string& transactionId) {
92  ret = m_connection.createReadOnlySession(connectionString, transactionId);
93  return ret;
94 }
95 
97 
99  std::lock_guard<std::recursive_mutex> lock(m_mutex);
100  doStartTransaction();
101  cond::persistency::TransactionScope scope(m_session.transaction());
102  this->initDB();
103  for (auto& iR : m_records) {
104  if (iR.second.m_isNewTag == false) {
105  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
106  editor.lock();
107  }
108  }
109  if (m_autoCommit) {
110  doCommitTransaction();
111  }
112  scope.close();
113 }
114 
116  std::lock_guard<std::recursive_mutex> lock(m_mutex);
117  doStartTransaction();
118  cond::persistency::TransactionScope scope(m_session.transaction());
119  this->initDB();
120  for (auto& iR : m_records) {
121  if (iR.second.m_isNewTag == false) {
122  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
123  editor.unlock();
124  }
125  }
126  if (m_autoCommit) {
127  doCommitTransaction();
128  }
129  scope.close();
130 }
131 
133  return this->lookUpRecord(recordName).m_tag;
134 }
135 
137  std::lock_guard<std::recursive_mutex> lock(m_mutex);
138  bool doCommit = false;
139  if (!m_transactionActive) {
140  m_session.transaction().start(true);
141  doCommit = true;
142  }
143  bool dbexists = false;
144  try {
145  dbexists = initDB(true);
146  } catch (const std::exception& er) {
147  cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
148  }
149  if (doCommit)
150  m_session.transaction().commit();
151  if (!dbexists)
152  return true;
153  auto& myrecord = this->lookUpRecord(recordName);
154  return myrecord.m_isNewTag;
155 }
156 
158  if (!m_transactionActive) {
159  m_session.transaction().start(false);
160  m_transactionActive = true;
161  }
162 }
163 
165  if (m_transactionActive) {
166  if (m_writeTransactionDelay) {
167  m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
168  ::sleep(m_writeTransactionDelay);
169  }
170  m_session.transaction().commit();
171  m_transactionActive = false;
172  }
173 }
174 
176  std::lock_guard<std::recursive_mutex> lock(m_mutex);
177  doStartTransaction();
178 }
179 
181  std::lock_guard<std::recursive_mutex> lock(m_mutex);
182  doCommitTransaction();
183 }
184 
186  if (!m_dbInitialised) {
187  if (!m_session.existsDatabase()) {
188  if (readOnly)
189  return false;
190  m_session.createDatabase();
191  } else {
192  for (auto& iR : m_records) {
193  if (m_session.existsIov(iR.second.m_tag)) {
194  iR.second.m_isNewTag = false;
195  }
196  }
197  }
198  m_dbInitialised = true;
199  }
200  return m_dbInitialised;
201 }
202 
204  const std::string& recordName) {
205  std::map<std::string, Record>::iterator it = m_records.find(recordName);
206  if (it == m_records.end()) {
207  cond::throwException("The record \"" + recordName + "\" has not been registered.",
208  "PoolDBOutputService::getRecord");
209  }
210  return it->second;
211 }
212 
213 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
214 
216  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
217 }
218 
220  edm::ModuleCallingContext const&) {
221  s_streamIndex = iContext.streamID().value();
222 }
223 
225  edm::ModuleCallingContext const&) {
226  s_streamIndex = -1;
227 }
228 
230  for (auto& time : m_currentTimes) {
231  time = iContext.luminosityBlockID().run();
232  }
233 }
234 
236  for (auto& time : m_currentTimes) {
237  time = iContext.luminosityBlockID().value();
238  }
239 }
240 
242 
244  std::lock_guard<std::recursive_mutex> lock(m_mutex);
245  doStartTransaction();
246  cond::persistency::TransactionScope scope(m_session.transaction());
247  try {
248  initDB();
249  if (m_autoCommit) {
250  doCommitTransaction();
251  }
252  } catch (const std::exception& er) {
253  cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
254  }
255  scope.close();
256 }
257 
259 
261 
263  assert(-1 != s_streamIndex);
264  return m_currentTimes[s_streamIndex];
265 }
266 
268  cond::Time_t firstSinceTime,
269  const std::string& recordName) {
270  std::lock_guard<std::recursive_mutex> lock(m_mutex);
271  doStartTransaction();
272  cond::persistency::TransactionScope scope(m_session.transaction());
273  try {
274  this->initDB();
275  auto& myrecord = this->getRecord(recordName);
276  if (!myrecord.m_isNewTag) {
277  cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
278  }
279  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
280  << " pointing to payload id " << firstPayloadId;
282  m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
283  editor.setDescription("New Tag");
284  editor.insert(firstSinceTime, firstPayloadId);
285  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
286  editor.flush(a.usertext);
287  myrecord.m_isNewTag = false;
288  if (m_autoCommit) {
289  doCommitTransaction();
290  }
291  } catch (const std::exception& er) {
292  cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
293  }
294  scope.close();
295 }
296 
297 // private method
299  const std::string payloadType,
300  cond::Time_t firstSinceTime,
301  Record& myrecord) {
302  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
303  << ", adding iov with since " << firstSinceTime;
304  // FIX ME: synchronization type and description have to be passed as the other parameters?
306  m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
307  editor.setDescription("New Tag");
308  editor.insert(firstSinceTime, firstPayloadId);
309  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
310  editor.flush(a.usertext);
311  myrecord.m_isNewTag = false;
312 }
313 
316  const std::string& recordName) {
317  bool ret = false;
318  std::lock_guard<std::recursive_mutex> lock(m_mutex);
319  doStartTransaction();
320  cond::persistency::TransactionScope scope(m_session.transaction());
321  try {
322  bool dbexists = this->initDB();
323  if (!dbexists) {
324  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
325  }
326  auto& myrecord = this->lookUpRecord(recordName);
327  if (myrecord.m_isNewTag) {
328  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
329  "PoolDBOutputService::appendSinceTime");
330  }
331  ret = appendSinceTime(payloadId, time, myrecord);
332  if (m_autoCommit) {
333  doCommitTransaction();
334  }
335  } catch (const std::exception& er) {
336  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
337  }
338  scope.close();
339  return ret;
340 }
341 
342 // private method
345  const Record& myrecord) {
346  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
347  try {
348  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
349  editor.insert(time, payloadId);
350  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
351  editor.flush(a.usertext);
352  } catch (const std::exception& er) {
353  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
354  }
355  return true;
356 }
357 
359  cond::Time_t sinceTime,
360  const std::string& recordName) {
361  std::lock_guard<std::recursive_mutex> lock(m_mutex);
362  doStartTransaction();
363  cond::persistency::TransactionScope scope(m_session.transaction());
364  try {
365  bool dbexists = this->initDB();
366  if (!dbexists) {
367  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
368  }
369  auto& myrecord = this->lookUpRecord(recordName);
370  if (myrecord.m_isNewTag) {
371  cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
372  "PoolDBOutputService::appendSinceTime");
373  }
374  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
375  << " pointing to payload id " << payloadId;
376  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
377  editor.erase(sinceTime, payloadId);
378  cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
379  editor.flush(a.usertext);
380  if (m_autoCommit) {
381  doCommitTransaction();
382  }
383  } catch (const std::exception& er) {
384  cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
385  }
386  scope.close();
387 }
388 
390  const std::string& recordName) {
391  std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
392  if (it == m_records.end()) {
393  cond::throwException("The record \"" + recordName + "\" has not been registered.",
394  "PoolDBOutputService::lookUpRecord");
395  }
396  return it->second;
397 }
398 
400  std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
401  if (it == m_logheaders.end())
402  throw cond::Exception("Log db was not set for record " + recordName +
403  " from PoolDBOutputService::lookUpUserLogInfo");
404  return it->second;
405 }
406 
408  std::lock_guard<std::recursive_mutex> lock(m_mutex);
409  doStartTransaction();
410  cond::persistency::TransactionScope scope(m_session.transaction());
411  try {
412  bool dbexists = this->initDB();
413  if (!dbexists) {
414  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
415  }
416  auto& myrecord = lookUpRecord(recordName);
417  if (myrecord.m_isNewTag) {
418  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
419  "PoolDBOutputService::closeIOV");
420  }
421  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
422  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
423  editor.setEndOfValidity(lastTill);
424  editor.flush("Tag closed.");
425  if (m_autoCommit) {
426  doCommitTransaction();
427  }
428  } catch (const std::exception& er) {
429  cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
430  }
431  scope.close();
432 }
433 
435  const std::string& dataprovenance,
436  const std::string& usertext) {
437  cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
438  myloginfo.provenance = dataprovenance;
439  myloginfo.usertext = usertext;
440 }
441 
442 // Still required.
444  auto& record = lookUpRecord(recordName);
445  result.name = record.m_tag;
446  m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
447  bool ret = false;
448  //use iovproxy to find out.
449  if (m_session.existsIov(record.m_tag)) {
450  cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
451  result.lastInterval = iov.getLast();
452  ret = true;
453  }
454  return ret;
455 }
456 
457 // Still required.
459  std::lock_guard<std::recursive_mutex> lock(m_mutex);
460  bool ret = false;
461  bool doCommit = false;
462  if (!m_transactionActive) {
463  m_session.transaction().start(true);
464  doCommit = true;
465  }
466  bool dbexists = false;
467  cond::persistency::TransactionScope scope(m_session.transaction());
468  try {
469  dbexists = initDB(true);
470  if (dbexists) {
471  ret = getTagInfo(recordName, result);
472  }
473  } catch (const std::exception& er) {
474  cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
475  }
476  if (doCommit)
477  m_session.transaction().commit();
478  scope.close();
479  return ret;
480 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:16
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
Base exception class for the object to relational access.
Definition: Exception.h:11
Timestamp const & timestamp() const
Definition: StreamContext.h:63
void preEventProcessing(edm::StreamContext const &)
void fillRecord(edm::ParameterSet &pset, const std::string &gTimeTypeStr)
Time_t beginValue
Definition: Time.h:41
const Record & lookUpRecord(const std::string &recordName)
ret
prodAgent to be discontinued
std::string provenance
Definition: Types.h:23
std::map< std::string, cond::UserLogInfo > m_logheaders
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
void setDescription(const std::string &description)
Definition: IOVEditor.cc:139
void preGlobalBeginLumi(edm::GlobalContext const &)
assert(be >=bs)
void throwException(const std::string &message, const std::string &methodName)
Definition: Exception.cc:18
std::string tag(const std::string &recordName)
T getUntrackedParameter(std::string const &, T const &) const
void closeIOV(Time_t lastTill, const std::string &recordName)
unsigned long long Time_t
Definition: Time.h:14
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:25
bool isNewTagRequest(const std::string &recordName)
StreamID const & streamID() const
Definition: StreamContext.h:55
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:66
void erase(cond::Time_t since, const cond::Hash &payloadHash)
Definition: IOVEditor.cc:174
RunNumber_t run() const
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
std::map< std::string, Record > m_records
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:159
Record & getRecord(const std::string &recordName)
TimeValue_t value() const
Definition: Timestamp.h:38
void preGlobalBeginRun(edm::GlobalContext const &)
bool appendSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
void createNewIOV(const std::string &firstPayloadId, cond::Time_t firstSinceTime, const std::string &recordName)
bool getTagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::vector< AlignmentParameters * > Parameters
Definition: Utilities.h:32
std::string usertext
Definition: Types.h:24
static const std::string kSharedResource
cond::persistency::Session newReadOnlySession(const std::string &connectionString, const std::string &transactionId)
double a
Definition: hdecay.h:121
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:129
unsigned int value() const
Definition: StreamID.h:43
Time_t endValue
Definition: Time.h:42
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool tagInfo(const std::string &recordName, cond::TagInfo_t &result)
void eraseSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
cond::persistency::Session session() const