CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
PoolDBOutputService.cc
Go to the documentation of this file.
10 //
11 #include <vector>
12 #include <memory>
13 #include <cassert>
14 
15 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
16 // of which stream is presently being processed on a given thread during the call of
17 // a module which calls that method.
18 static thread_local int s_streamIndex = -1;
19 
21  Record thisrecord;
22 
23  thisrecord.m_idName = recordPset.getParameter<std::string>("record");
24  thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
25 
26  thisrecord.m_timetype =
27  cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
28 
29  thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
30 
31  m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
32 
33  cond::UserLogInfo userloginfo;
34  m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
35 }
36 
38  : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
39  m_currentTimes{},
40  m_session(),
41  m_transactionActive(false),
42  m_dbInitialised(false),
43  m_records(),
44  m_logheaders() {
45  std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
46  m_timetype = cond::time::timeTypeFromName(timetypestr);
47  m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", false);
48  m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
49  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
50  m_connection.setParameters(connectionPset);
51  m_connection.setLogDestination(m_logger);
52  m_connection.configure();
53  std::string connectionString = iConfig.getParameter<std::string>("connect");
54  m_session = m_connection.createSession(connectionString, true);
55  bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
56  if (saveLogsOnDb)
57  m_logger.setDbDestination(connectionString);
58  // implicit start
59  doStartTransaction();
60  typedef std::vector<edm::ParameterSet> Parameters;
61  Parameters toPut = iConfig.getParameter<Parameters>("toPut");
62  for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
63  fillRecord(*itToPut, timetypestr);
64 
65  iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
66  iAR.watchPreallocate(
67  [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
68  if (m_timetype == cond::timestamp) { //timestamp
70  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
71  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
72  } else if (m_timetype == cond::runnumber) { //runnumber
73  //NOTE: this assumes only one run is being processed at a time.
74  // This is true for 7_1_X but plan are to allow multiple in flight at a time
75  s_streamIndex = 0;
76  iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
77  } else if (m_timetype == cond::lumiid) {
78  //NOTE: this assumes only one lumi is being processed at a time.
79  // This is true for 7_1_X but plan are to allow multiple in flight at a time
80  s_streamIndex = 0;
81  iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
82  }
83 }
84 
86  const std::string& transactionId) {
88  ret = m_connection.createReadOnlySession(connectionString, transactionId);
89  return ret;
90 }
91 
93 
95  std::lock_guard<std::recursive_mutex> lock(m_mutex);
96  doStartTransaction();
97  cond::persistency::TransactionScope scope(m_session.transaction());
98  this->initDB();
99  for (auto& iR : m_records) {
100  if (iR.second.m_isNewTag == false) {
101  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
102  editor.lock();
103  }
104  }
105  doCommitTransaction();
106  scope.close();
107 }
108 
110  std::lock_guard<std::recursive_mutex> lock(m_mutex);
111  doStartTransaction();
112  cond::persistency::TransactionScope scope(m_session.transaction());
113  this->initDB();
114  for (auto& iR : m_records) {
115  if (iR.second.m_isNewTag == false) {
116  cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
117  editor.unlock();
118  }
119  }
120  doCommitTransaction();
121  scope.close();
122 }
123 
125  return this->lookUpRecord(recordName).m_tag;
126 }
127 
129  Record& myrecord = this->lookUpRecord(recordName);
130  return myrecord.m_isNewTag;
131 }
132 
134  if (!m_transactionActive) {
135  m_session.transaction().start(false);
136  m_transactionActive = true;
137  }
138 }
139 
141  if (m_transactionActive) {
142  m_session.transaction().commit();
143  m_transactionActive = false;
144  }
145 }
146 
148  std::lock_guard<std::recursive_mutex> lock(m_mutex);
149  doStartTransaction();
150 }
151 
153  std::lock_guard<std::recursive_mutex> lock(m_mutex);
154  doCommitTransaction();
155 }
156 
158  if (!m_dbInitialised) {
159  if (!m_session.existsDatabase())
160  m_session.createDatabase();
161  else {
162  for (auto& iR : m_records) {
163  if (m_session.existsIov(iR.second.m_tag))
164  iR.second.m_isNewTag = false;
165  }
166  }
167  m_dbInitialised = true;
168  }
169 }
170 
171 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
172 
174  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
175 }
176 
178  edm::ModuleCallingContext const&) {
179  s_streamIndex = iContext.streamID().value();
180 }
181 
183  edm::ModuleCallingContext const&) {
184  s_streamIndex = -1;
185 }
186 
188  for (auto& time : m_currentTimes) {
189  time = iContext.luminosityBlockID().run();
190  }
191 }
192 
194  for (auto& time : m_currentTimes) {
195  time = iContext.luminosityBlockID().value();
196  }
197 }
198 
200 
202  std::lock_guard<std::recursive_mutex> lock(m_mutex);
203  doStartTransaction();
204  cond::persistency::TransactionScope scope(m_session.transaction());
205  try {
206  initDB();
207  } catch (const std::exception& er) {
208  cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
209  }
210  scope.close();
211 }
212 
214 
216 
218  assert(-1 != s_streamIndex);
219  return m_currentTimes[s_streamIndex];
220 }
221 
223  cond::Time_t firstSinceTime,
224  const std::string& recordName) {
225  std::lock_guard<std::recursive_mutex> lock(m_mutex);
226  Record& myrecord = this->lookUpRecord(recordName);
227  if (!myrecord.m_isNewTag) {
228  cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
229  }
230  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
231  << " pointing to payload id " << firstPayloadId;
232  doStartTransaction();
233  cond::persistency::TransactionScope scope(m_session.transaction());
234  try {
235  this->initDB();
237  m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
238  editor.setDescription("New Tag");
239  editor.insert(firstSinceTime, firstPayloadId);
240  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
241  editor.flush(a.usertext);
242  myrecord.m_isNewTag = false;
243  } catch (const std::exception& er) {
244  cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
245  }
246  scope.close();
247 }
248 
250  const std::string payloadType,
251  cond::Time_t firstSinceTime,
252  Record& myrecord) {
253  m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
254  << ", adding iov with since " << firstSinceTime;
255  // FIX ME: synchronization type and description have to be passed as the other parameters?
257  m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
258  editor.setDescription("New Tag");
259  editor.insert(firstSinceTime, firstPayloadId);
260  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
261  editor.flush(a.usertext);
262  myrecord.m_isNewTag = false;
263 }
264 
266  cond::Time_t time,
267  const std::string& recordName) {
268  std::lock_guard<std::recursive_mutex> lock(m_mutex);
269  Record& myrecord = this->lookUpRecord(recordName);
270  if (myrecord.m_isNewTag) {
271  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
272  "PoolDBOutputService::appendSinceTime");
273  }
274  bool ret = false;
275  doStartTransaction();
276  cond::persistency::TransactionScope scope(m_session.transaction());
277  try {
278  ret = appendSinceTime(payloadId, time, myrecord);
279  } catch (const std::exception& er) {
280  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
281  }
282  scope.close();
283  return ret;
284 }
285 
287  cond::Time_t time,
288  Record& myrecord) {
289  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
290  try {
291  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
292  editor.insert(time, payloadId);
293  cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
294  editor.flush(a.usertext);
295  } catch (const std::exception& er) {
296  cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
297  }
298  return true;
299 }
300 
302  cond::Time_t sinceTime,
303  const std::string& recordName) {
304  std::lock_guard<std::recursive_mutex> lock(m_mutex);
305  Record& myrecord = this->lookUpRecord(recordName);
306  if (myrecord.m_isNewTag) {
307  cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
308  "PoolDBOutputService::appendSinceTime");
309  }
310  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
311  << " pointing to payload id " << payloadId;
312  doStartTransaction();
313  cond::persistency::TransactionScope scope(m_session.transaction());
314  try {
315  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
316  editor.erase(sinceTime, payloadId);
317  cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
318  editor.flush(a.usertext);
319 
320  } catch (const std::exception& er) {
321  cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
322  }
323  scope.close();
324 }
325 
327  const std::string& recordName) {
328  std::map<std::string, Record>::iterator it = m_records.find(recordName);
329  if (it == m_records.end()) {
330  cond::throwException("The record \"" + recordName + "\" has not been registered.",
331  "PoolDBOutputService::lookUpRecord");
332  }
333  return it->second;
334 }
335 
337  std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
338  if (it == m_logheaders.end())
339  throw cond::Exception("Log db was not set for record " + recordName +
340  " from PoolDBOutputService::lookUpUserLogInfo");
341  return it->second;
342 }
343 
345  std::lock_guard<std::recursive_mutex> lock(m_mutex);
346  Record& myrecord = lookUpRecord(recordName);
347  if (myrecord.m_isNewTag) {
348  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
349  "PoolDBOutputService::closeIOV");
350  }
351  m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
352  doStartTransaction();
353  cond::persistency::TransactionScope scope(m_session.transaction());
354  try {
355  cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
356  editor.setEndOfValidity(lastTill);
357  editor.flush("Tag closed.");
358  } catch (const std::exception& er) {
359  cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
360  }
361  scope.close();
362 }
363 
365  const std::string& dataprovenance,
366  const std::string& usertext) {
367  cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
368  myloginfo.provenance = dataprovenance;
369  myloginfo.usertext = usertext;
370 }
371 
372 // Still required.
374  Record& record = lookUpRecord(recordName);
375  result.name = record.m_tag;
376  m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
377  doStartTransaction();
378  bool ret = false;
379  cond::persistency::TransactionScope scope(m_session.transaction());
380  try {
381  //use iovproxy to find out.
382  if (m_session.existsIov(record.m_tag)) {
383  cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
384  result.lastInterval = iov.getLast();
385  ret = true;
386  }
387  } catch (const std::exception& er) {
388  cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
389  }
390  scope.close();
391  return ret;
392 }
393 
394 // Still required.
396  std::lock_guard<std::recursive_mutex> lock(m_mutex);
397  return getTagInfo(recordName, result);
398 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:16
T getUntrackedParameter(std::string const &, T const &) const
tuple ret
prodAgent to be discontinued
Base exception class for the object to relational access.
Definition: Exception.h:11
Iov_t lastInterval
Definition: Types.h:73
void preEventProcessing(edm::StreamContext const &)
void fillRecord(edm::ParameterSet &pset, const std::string &gTimeTypeStr)
Time_t beginValue
Definition: Time.h:41
std::string provenance
Definition: Types.h:23
std::map< std::string, cond::UserLogInfo > m_logheaders
cond::persistency::Session session() const
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
void setDescription(const std::string &description)
Definition: IOVEditor.cc:139
tuple recordName
Definition: align_cfg.py:66
void preGlobalBeginLumi(edm::GlobalContext const &)
assert(be >=bs)
void throwException(const std::string &message, const std::string &methodName)
Definition: Exception.cc:18
std::string tag(const std::string &recordName)
std::string name
Definition: Types.h:72
tuple result
Definition: mps_fire.py:311
void appendSinceTime(const T *payloadPtr, cond::Time_t sinceTime, const std::string &recordName)
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
void closeIOV(Time_t lastTill, const std::string &recordName)
unsigned long long Time_t
Definition: Time.h:14
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:25
bool isNewTagRequest(const std::string &recordName)
Record & lookUpRecord(const std::string &recordName)
RunNumber_t run() const
uint64_t value() const
void erase(cond::Time_t since, const cond::Hash &payloadHash)
Definition: IOVEditor.cc:174
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
StreamID const & streamID() const
Definition: StreamContext.h:54
std::map< std::string, Record > m_records
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:159
unsigned int value() const
Definition: StreamID.h:43
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void preGlobalBeginRun(edm::GlobalContext const &)
bool getTagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::string usertext
Definition: Types.h:24
cond::persistency::Session newReadOnlySession(const std::string &connectionString, const std::string &transactionId)
double a
Definition: hdecay.h:119
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:129
Time_t endValue
Definition: Time.h:42
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
bool tagInfo(const std::string &recordName, cond::TagInfo_t &result)
void eraseSinceTime(const std::string &payloadId, cond::Time_t sinceTime, const std::string &recordName)
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
TimeValue_t value() const
Definition: Timestamp.h:45
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
void createNewIOV(const T *payloadPtr, cond::Time_t firstSinceTime, cond::Time_t, const std::string &recordName)
Timestamp const & timestamp() const
Definition: StreamContext.h:62