CMS 3D CMS Logo

PoolDBOutputService.cc
Go to the documentation of this file.
10 //
11 #include <vector>
12 #include <memory>
13 #include <cassert>
14 
15 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
16 // of which stream is presently being processed on a given thread during the call of
17 // a module which calls that method.
18 static thread_local int s_streamIndex = -1;
19 
21  Record thisrecord;
22 
23  thisrecord.m_idName = recordPset.getParameter<std::string>("record");
24  thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
25 
26  thisrecord.m_timetype =
27  cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
28 
29  thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
30 
31  thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
32 
33  m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
34 
35  cond::UserLogInfo userloginfo;
36  m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
37 }
38 
40  : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
41  m_currentTimes{},
42  m_session(),
43  m_transactionActive(false),
44  m_dbInitialised(false),
45  m_records(),
46  m_logheaders() {
47  std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
48  m_timetype = cond::time::timeTypeFromName(timetypestr);
49  m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
50  m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
51  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
52  m_connection.setParameters(connectionPset);
53  m_connection.setLogDestination(m_logger);
54  m_connection.configure();
55  std::string connectionString = iConfig.getParameter<std::string>("connect");
56  m_session = m_connection.createSession(connectionString, true);
57  bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
58  if (saveLogsOnDb)
59  m_logger.setDbDestination(connectionString);
60  // implicit start
61  //doStartTransaction();
62  typedef std::vector<edm::ParameterSet> Parameters;
63  Parameters toPut = iConfig.getParameter<Parameters>("toPut");
64  for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
65  fillRecord(*itToPut, timetypestr);
66 
67  iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
68  iAR.watchPreallocate(
69  [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
70  if (m_timetype == cond::timestamp) { //timestamp
72  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
73  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
74  } else if (m_timetype == cond::runnumber) { //runnumber
75  //NOTE: this assumes only one run is being processed at a time.
76  // This is true for 7_1_X but plan are to allow multiple in flight at a time
77  s_streamIndex = 0;
78  iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
79  } else if (m_timetype == cond::lumiid) {
80  //NOTE: this assumes only one lumi is being processed at a time.
81  // This is true for 7_1_X but plan are to allow multiple in flight at a time
82  s_streamIndex = 0;
83  iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
84  }
85 }
86 
88  const std::string& transactionId) {
90  ret = m_connection.createReadOnlySession(connectionString, transactionId);
91  return ret;
92 }
93 
95 
97  std::lock_guard<std::recursive_mutex> lock(m_mutex);
98  doStartTransaction();
99  cond::persistency::TransactionScope scope(m_session.transaction());
100  this->initDB();
101  for (auto& iR : m_records) {
102  if (iR.second.m_isNewTag == false) {
103  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
104  editor.lock();
105  }
106  }
107  if (m_autoCommit) {
108  doCommitTransaction();
109  }
110  scope.close();
111 }
112 
114  std::lock_guard<std::recursive_mutex> lock(m_mutex);
115  doStartTransaction();
116  cond::persistency::TransactionScope scope(m_session.transaction());
117  this->initDB();
118  for (auto& iR : m_records) {
119  if (iR.second.m_isNewTag == false) {
120  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
121  editor.unlock();
122  }
123  }
124  if (m_autoCommit) {
125  doCommitTransaction();
126  }
127  scope.close();
128 }
129 
131  return this->lookUpRecord(recordName).m_tag;
132 }
133 
135  std::lock_guard<std::recursive_mutex> lock(m_mutex);
136  bool doCommit = false;
137  if (!m_transactionActive) {
138  m_session.transaction().start(true);
139  doCommit = true;
140  }
141  bool dbexists = false;
142  try {
143  dbexists = initDB(true);
144  } catch (const std::exception& er) {
145  cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
146  }
147  if (doCommit)
148  m_session.transaction().commit();
149  if (!dbexists)
150  return true;
151  auto& myrecord = this->lookUpRecord(recordName);
152  return myrecord.m_isNewTag;
153 }
154 
156  if (!m_transactionActive) {
157  m_session.transaction().start(false);
158  m_transactionActive = true;
159  }
160 }
161 
163  if (m_transactionActive) {
164  if (m_writeTransactionDelay) {
165  m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
166  ::sleep(m_writeTransactionDelay);
167  }
168  m_session.transaction().commit();
169  m_transactionActive = false;
170  }
171 }
172 
174  std::lock_guard<std::recursive_mutex> lock(m_mutex);
175  doStartTransaction();
176 }
177 
179  std::lock_guard<std::recursive_mutex> lock(m_mutex);
180  doCommitTransaction();
181 }
182 
184  if (!m_dbInitialised) {
185  if (!m_session.existsDatabase()) {
186  if (readOnly)
187  return false;
188  m_session.createDatabase();
189  } else {
190  for (auto& iR : m_records) {
191  if (m_session.existsIov(iR.second.m_tag)) {
192  iR.second.m_isNewTag = false;
193  }
194  }
195  }
196  m_dbInitialised = true;
197  }
198  return m_dbInitialised;
199 }
200 
202  const std::string& recordName) {
203  std::map<std::string, Record>::iterator it = m_records.find(recordName);
204  if (it == m_records.end()) {
205  cond::throwException("The record \"" + recordName + "\" has not been registered.",
206  "PoolDBOutputService::getRecord");
207  }
208  return it->second;
209 }
210 
211 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
212 
214  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
215 }
216 
218  edm::ModuleCallingContext const&) {
219  s_streamIndex = iContext.streamID().value();
220 }
221 
223  edm::ModuleCallingContext const&) {
224  s_streamIndex = -1;
225 }
226 
228  for (auto& time : m_currentTimes) {
229  time = iContext.luminosityBlockID().run();
230  }
231 }
232 
234  for (auto& time : m_currentTimes) {
235  time = iContext.luminosityBlockID().value();
236  }
237 }
238 
240 
242  std::lock_guard<std::recursive_mutex> lock(m_mutex);
243  doStartTransaction();
244  cond::persistency::TransactionScope scope(m_session.transaction());
245  try {
246  initDB();
247  if (m_autoCommit) {
248  doCommitTransaction();
249  }
250  } catch (const std::exception& er) {
251  cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
252  }
253  scope.close();
254 }
255 
257 
259 
261  assert(-1 != s_streamIndex);
262  return m_currentTimes[s_streamIndex];
263 }
264 
266  cond::Time_t firstSinceTime,
267  const std::string& recordName) {
268  std::lock_guard<std::recursive_mutex> lock(m_mutex);
269  doStartTransaction();
270  cond::persistency::TransactionScope scope(m_session.transaction());
271  try {
272  this->initDB();
273  auto& myrecord = this->getRecord(recordName);
274  if (!myrecord.m_isNewTag) {
275  cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
276  }
277  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
278  << " pointing to payload id " << firstPayloadId;
280  m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
281  editor.setDescription("New Tag");
282  editor.insert(firstSinceTime, firstPayloadId);
283  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
284  editor.flush(a.usertext);
285  myrecord.m_isNewTag = false;
286  if (m_autoCommit) {
287  doCommitTransaction();
288  }
289  } catch (const std::exception& er) {
290  cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
291  }
292  scope.close();
293 }
294 
295 // private method
297  const std::string payloadType,
298  cond::Time_t firstSinceTime,
299  Record& myrecord) {
300  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
301  << ", adding iov with since " << firstSinceTime;
302  // FIX ME: synchronization type and description have to be passed as the other parameters?
304  m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
305  editor.setDescription("New Tag");
306  editor.insert(firstSinceTime, firstPayloadId);
307  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
308  editor.flush(a.usertext);
309  myrecord.m_isNewTag = false;
310 }
311 
314  const std::string& recordName) {
315  bool ret = false;
316  std::lock_guard<std::recursive_mutex> lock(m_mutex);
317  doStartTransaction();
318  cond::persistency::TransactionScope scope(m_session.transaction());
319  try {
320  bool dbexists = this->initDB();
321  if (!dbexists) {
322  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
323  }
324  auto& myrecord = this->lookUpRecord(recordName);
325  if (myrecord.m_isNewTag) {
326  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
327  "PoolDBOutputService::appendSinceTime");
328  }
329  ret = appendSinceTime(payloadId, time, myrecord);
330  if (m_autoCommit) {
331  doCommitTransaction();
332  }
333  } catch (const std::exception& er) {
334  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
335  }
336  scope.close();
337  return ret;
338 }
339 
340 // private method
343  const Record& myrecord) {
344  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
345  try {
346  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
347  editor.insert(time, payloadId);
348  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
349  editor.flush(a.usertext);
350  } catch (const std::exception& er) {
351  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
352  }
353  return true;
354 }
355 
357  cond::Time_t sinceTime,
358  const std::string& recordName) {
359  std::lock_guard<std::recursive_mutex> lock(m_mutex);
360  doStartTransaction();
361  cond::persistency::TransactionScope scope(m_session.transaction());
362  try {
363  bool dbexists = this->initDB();
364  if (!dbexists) {
365  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
366  }
367  auto& myrecord = this->lookUpRecord(recordName);
368  if (myrecord.m_isNewTag) {
369  cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
370  "PoolDBOutputService::appendSinceTime");
371  }
372  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
373  << " pointing to payload id " << payloadId;
374  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
375  editor.erase(sinceTime, payloadId);
376  cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
377  editor.flush(a.usertext);
378  if (m_autoCommit) {
379  doCommitTransaction();
380  }
381  } catch (const std::exception& er) {
382  cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
383  }
384  scope.close();
385 }
386 
388  const std::string& recordName) {
389  std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
390  if (it == m_records.end()) {
391  cond::throwException("The record \"" + recordName + "\" has not been registered.",
392  "PoolDBOutputService::lookUpRecord");
393  }
394  return it->second;
395 }
396 
398  std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
399  if (it == m_logheaders.end())
400  throw cond::Exception("Log db was not set for record " + recordName +
401  " from PoolDBOutputService::lookUpUserLogInfo");
402  return it->second;
403 }
404 
406  std::lock_guard<std::recursive_mutex> lock(m_mutex);
407  doStartTransaction();
408  cond::persistency::TransactionScope scope(m_session.transaction());
409  try {
410  bool dbexists = this->initDB();
411  if (!dbexists) {
412  cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
413  }
414  auto& myrecord = lookUpRecord(recordName);
415  if (myrecord.m_isNewTag) {
416  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
417  "PoolDBOutputService::closeIOV");
418  }
419  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
420  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
421  editor.setEndOfValidity(lastTill);
422  editor.flush("Tag closed.");
423  if (m_autoCommit) {
424  doCommitTransaction();
425  }
426  } catch (const std::exception& er) {
427  cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
428  }
429  scope.close();
430 }
431 
433  const std::string& dataprovenance,
434  const std::string& usertext) {
435  cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
436  myloginfo.provenance = dataprovenance;
437  myloginfo.usertext = usertext;
438 }
439 
440 // Still required.
442  auto& record = lookUpRecord(recordName);
443  result.name = record.m_tag;
444  m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
445  bool ret = false;
446  //use iovproxy to find out.
447  if (m_session.existsIov(record.m_tag)) {
448  cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
449  result.lastInterval = iov.getLast();
450  ret = true;
451  }
452  return ret;
453 }
454 
455 // Still required.
457  std::lock_guard<std::recursive_mutex> lock(m_mutex);
458  bool ret = false;
459  bool doCommit = false;
460  if (!m_transactionActive) {
461  m_session.transaction().start(true);
462  doCommit = true;
463  }
464  bool dbexists = false;
465  cond::persistency::TransactionScope scope(m_session.transaction());
466  try {
467  dbexists = initDB(true);
468  if (dbexists) {
469  ret = getTagInfo(recordName, result);
470  }
471  } catch (const std::exception& er) {
472  cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
473  }
474  if (doCommit)
475  m_session.transaction().commit();
476  scope.close();
477  return ret;
478 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:16
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
Base exception class for the object to relational access.
Definition: Exception.h:11
Timestamp const & timestamp() const
Definition: StreamContext.h:63
void preEventProcessing(edm::StreamContext const &)
void fillRecord(edm::ParameterSet &pset, const std::string &gTimeTypeStr)
Time_t beginValue
Definition: Time.h:41
const Record & lookUpRecord(const std::string &recordName)
ret
prodAgent to be discontinued
std::string provenance
Definition: Types.h:23
std::map< std::string, cond::UserLogInfo > m_logheaders
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
void setDescription(const std::string &description)
Definition: IOVEditor.cc:139
void preGlobalBeginLumi(edm::GlobalContext const &)
assert(be >=bs)
void throwException(const std::string &message, const std::string &methodName)
Definition: Exception.cc:18
std::string tag(const std::string &recordName)
T getUntrackedParameter(std::string const &, T const &) const
void closeIOV(Time_t lastTill, const std::string &recordName)
unsigned long long Time_t
Definition: Time.h:14
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:25
bool isNewTagRequest(const std::string &recordName)
StreamID const & streamID() const
Definition: StreamContext.h:55
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
void erase(cond::Time_t since, const cond::Hash &payloadHash)
Definition: IOVEditor.cc:174
RunNumber_t run() const
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
std::map< std::string, Record > m_records
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:159
Record & getRecord(const std::string &recordName)
TimeValue_t value() const
Definition: Timestamp.h:38
void preGlobalBeginRun(edm::GlobalContext const &)
bool appendSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
void createNewIOV(const std::string &firstPayloadId, cond::Time_t firstSinceTime, const std::string &recordName)
bool getTagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::string usertext
Definition: Types.h:24
cond::persistency::Session newReadOnlySession(const std::string &connectionString, const std::string &transactionId)
double a
Definition: hdecay.h:119
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:129
unsigned int value() const
Definition: StreamID.h:43
Time_t endValue
Definition: Time.h:42
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool tagInfo(const std::string &recordName, cond::TagInfo_t &result)
void eraseSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
cond::persistency::Session session() const