CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolDBOutputService.cc
Go to the documentation of this file.
3 //#include "CondCore/DBCommon/interface/TagInfo.h"
4 //#include "CondCore/DBCommon/interface/IOVInfo.h"
15 //
16 #include <vector>
17 #include<memory>
18 #include <cassert>
19 
20 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
21 // of which stream is presently being processed on a given thread during the call of
22 // a module which calls that method.
23 static thread_local int s_streamIndex = -1;
24 
25 void
27  Record thisrecord;
28 
29  thisrecord.m_idName = pset.getParameter<std::string>("record");
30  thisrecord.m_tag = pset.getParameter<std::string>("tag");
31 
32  thisrecord.m_closeIOV =
33  pset.getUntrackedParameter<bool>("closeIOV", m_closeIOV);
34 
36 
37  m_callbacks.insert(std::make_pair(thisrecord.m_idName,thisrecord));
38 
39  // *** THE NEW LOGGING has still to be defined and implemented.
40  if( !m_logConnectionString.empty() ){
41  cond::UserLogInfo userloginfo;
42  m_logheaders.insert(std::make_pair(thisrecord.m_idName,userloginfo));
43  }
44 }
45 
47  m_timetypestr(""),
48  m_currentTimes{},
49  m_session(),
50  m_logConnectionString(""),
51  m_logdb(),
52  m_dbstarted( false ),
53  m_callbacks(),
54  m_closeIOV(false),
55  m_logheaders()
56 {
57  m_closeIOV=iConfig.getUntrackedParameter<bool>("closeIOV",m_closeIOV);
58 
59  m_timetypestr=iConfig.getUntrackedParameter< std::string >("timetype","runnumber");
60  m_timetype = cond::time::timeTypeFromName( m_timetypestr );
61 
62  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
64  connection.setParameters( connectionPset );
65  connection.configure();
66  std::string connectionString = iConfig.getParameter<std::string>("connect");
67  BackendType backType = (BackendType) iConfig.getUntrackedParameter<int>("dbFormat", DEFAULT_DB );
68  if( backType == UNKNOWN_DB ) backType = DEFAULT_DB;
69  m_session = connection.createSession( connectionString, true, backType );
70 
71  if( iConfig.exists("logconnect") ){
72  m_logConnectionString = iConfig.getUntrackedParameter<std::string>("logconnect");
74  conn.configuration().setParameters( connectionPset );
75  conn.configure();
76  cond::DbSession logSession = conn.createSession();
77  m_logdb.reset( new cond::Logger( logSession ) );
78  }
79 
80  typedef std::vector< edm::ParameterSet > Parameters;
81  Parameters toPut=iConfig.getParameter<Parameters>("toPut");
82  for(Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
83  fillRecord( *itToPut);
84 
85 
86  iAR.watchPostEndJob(this,&cond::service::PoolDBOutputService::postEndJob);
87  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
88  m_currentTimes.resize(iBounds.maxNumberOfStreams());
89  });
90  if( m_timetype == cond::timestamp ){ //timestamp
92  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
93  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
94  } else if( m_timetype == cond::runnumber ){//runnumber
95  //NOTE: this assumes only one run is being processed at a time.
96  // This is true for 7_1_X but plan are to allow multiple in flight at a time
97  s_streamIndex = 0;
98  iAR.watchPreGlobalBeginRun(this,&cond::service::PoolDBOutputService::preGlobalBeginRun);
99  } else if( m_timetype == cond::lumiid ){
100  //NOTE: this assumes only one lumi is being processed at a time.
101  // This is true for 7_1_X but plan are to allow multiple in flight at a time
102  s_streamIndex = 0;
103  iAR.watchPreGlobalBeginLumi(this,&cond::service::PoolDBOutputService::preGlobalBeginLumi);
104  }
105 }
106 
109  return m_session;
110 }
111 
114  return this->lookUpRecord(recordName).m_tag;
115 }
116 
117 bool
119  Record& myrecord=this->lookUpRecord(recordName);
120  return myrecord.m_isNewTag;
121 }
122 
123 void
125 {
126  std::lock_guard<std::recursive_mutex> lock(m_mutex);
127  m_session.transaction().start(false);
128  cond::persistency::TransactionScope scope( m_session.transaction() );
129  try{
130  if(!forReading) {
131  if( !m_session.existsDatabase() ) m_session.createDatabase();
132  }
133  //init logdb if required
134  if(!m_logConnectionString.empty()){
135  m_logdb->connect( m_logConnectionString );
136  m_logdb->createLogDBIfNonExist();
137  }
138  } catch( const std::exception& er ){
139  cond::throwException( std::string(er.what()),"PoolDBOutputService::initDB" );
140  }
141  scope.close();
142  m_dbstarted=true;
143 }
144 
145 void
147 {
148  if( m_dbstarted) {
149  m_session.transaction().commit();
150  m_dbstarted = false;
151  }
152 }
153 
154 void
156 {
157  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
158 }
159 
160 void
162  s_streamIndex = iContext.streamID().value();
163 }
164 
165 void
167  s_streamIndex = -1;
168 }
169 
170 void
172  for( auto& time : m_currentTimes) {
173  time = iContext.luminosityBlockID().run();
174  }
175 }
176 
177 void
179  for( auto& time : m_currentTimes) {
180  time = iContext.luminosityBlockID().value();
181  }
182 }
183 
185 }
186 
187 
190  return timeTypeSpecs[m_timetype].endValue;
191 }
192 
195  return timeTypeSpecs[m_timetype].beginValue;
196 }
197 
200  assert(-1 != s_streamIndex);
201  return m_currentTimes[s_streamIndex];
202 }
203 
204 void
206  const std::string payloadType,
207  cond::Time_t firstSinceTime,
208  cond::Time_t firstTillTime,
209  const std::string& recordName,
210  bool withlogging){
211  std::lock_guard<std::recursive_mutex> lock(m_mutex);
212 
213  cond::persistency::TransactionScope scope( m_session.transaction() );
214  Record& myrecord=this->lookUpRecord(recordName);
215  if(!myrecord.m_isNewTag) {
216  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
217  }
218  std::string iovToken;
219  if( withlogging && m_logConnectionString.empty() )
220  cond::throwException("Log db was not set from PoolDBOutputService::createNewIOV",
221  "PoolDBOutputService::createNewIOV");
222 
223  try{
224  // FIX ME: synchronization type and description have to be passed as the other parameters?
225  cond::persistency::IOVEditor editor = m_session.createIov( payloadType, myrecord.m_tag, myrecord.m_timetype, cond::OFFLINE );
226  editor.setDescription( "New Tag" );
227  editor.insert( firstSinceTime, firstPayloadId );
228  editor.flush();
229  myrecord.m_isNewTag=false;
230  if(withlogging){
231  std::string destconnect=m_session.connectionString();
232  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
233  m_logdb->logOperationNow(a,destconnect,payloadType,firstPayloadId,myrecord.m_tag,myrecord.timetypestr(),0,firstSinceTime);
234  }
235  }catch(const std::exception& er){
236  if(withlogging){
237  std::string destconnect=m_session.connectionString();
238  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
239  m_logdb->logFailedOperationNow(a,destconnect,payloadType,firstPayloadId,myrecord.m_tag,myrecord.timetypestr(),0,firstSinceTime,std::string(er.what()));
240  }
241  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
242  "PoolDBOutputService::createNewIOV");
243  }
244  scope.close();
245 }
246 
247 void
249  cond::Time_t firstSinceTime,
250  cond::Time_t firstTillTime,
251  const std::string& recordName,
252  bool withlogging){
253  std::lock_guard<std::recursive_mutex> lock(m_mutex);
254  cond::persistency::TransactionScope scope( m_session.transaction() );
255  Record& myrecord=this->lookUpRecord(recordName);
256  if(!myrecord.m_isNewTag) {
257  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
258  }
259  std::string iovToken;
260  if( withlogging && m_logConnectionString.empty() )
261  cond::throwException("Log db was not set from PoolDBOutputService::createNewIOV",
262  "PoolDBOutputService::createNewIOV");
263 
265  try{
266  // FIX ME: synchronization type and description have to be passed as the other parameters?
267  cond::persistency::IOVEditor editor = m_session.createIovForPayload( firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::OFFLINE );
268  editor.setDescription( "New Tag" );
269  payloadType = editor.payloadType();
270  editor.insert( firstSinceTime, firstPayloadId );
271  editor.flush();
272  myrecord.m_isNewTag=false;
273  if(withlogging){
274  std::string destconnect=m_session.connectionString();
275  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
276  m_logdb->logOperationNow(a,destconnect,payloadType,firstPayloadId,myrecord.m_tag,myrecord.timetypestr(),0,firstSinceTime);
277  }
278  }catch(const std::exception& er){
279  if(withlogging){
280  std::string destconnect=m_session.connectionString();
281  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
282  m_logdb->logFailedOperationNow(a,destconnect,payloadType,firstPayloadId,myrecord.m_tag,myrecord.timetypestr(),0,firstSinceTime,std::string(er.what()));
283  }
284  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
285  "PoolDBOutputService::createNewIOV");
286  }
287  scope.close();
288 }
289 
290 void
293  const std::string& recordName,
294  bool withlogging) {
295  std::lock_guard<std::recursive_mutex> lock(m_mutex);
296  cond::persistency::TransactionScope scope( m_session.transaction() );
297  Record& myrecord=this->lookUpRecord(recordName);
298  if( myrecord.m_isNewTag ) {
299  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
300  "PoolDBOutputService::appendSinceTime");
301  }
302  if( withlogging && m_logConnectionString.empty() )
303  cond::throwException("Log db was not set from PoolDBOutputService::createNewIOV",
304  "PoolDBOutputService::createNewIOV");
306  try{
307  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
308  payloadType = editor.payloadType();
309  editor.insert( time, payloadId );
310  editor.flush();
311 
312  if(withlogging){
313  std::string destconnect=m_session.connectionString();
314  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
315  m_logdb->logOperationNow(a,destconnect,payloadType,payloadId,myrecord.m_tag,myrecord.timetypestr(),0,time);
316  }
317  }catch(const std::exception& er){
318  if(withlogging){
319  std::string destconnect=m_session.connectionString();
320  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
321  m_logdb->logFailedOperationNow(a,destconnect,payloadType,payloadId,myrecord.m_tag,myrecord.timetypestr(),0,time,std::string(er.what()));
322  }
324  "PoolDBOutputService::appendSinceTime");
325  }
326  scope.close();
327 }
328 
331  std::lock_guard<std::recursive_mutex> lock(m_mutex);
332  if (!m_dbstarted) this->initDB( false );
333  cond::persistency::TransactionScope scope( m_session.transaction() );
334  std::map<std::string,Record>::iterator it=m_callbacks.find(recordName);
335  if(it==m_callbacks.end()) {
336  cond::throwException("The record \""+recordName +"\" has not been registered.","PoolDBOutputService::lookUpRecord");
337  }
338  if( !m_session.existsIov( it->second.m_tag) ){
339  it->second.m_isNewTag=true;
340  } else {
341  it->second.m_isNewTag=false;
342  }
343  scope.close();
344  return it->second;
345 }
346 
349  std::map<std::string,cond::UserLogInfo>::iterator it=m_logheaders.find(recordName);
350  if(it==m_logheaders.end()) throw cond::Exception("Log db was not set for record " + recordName + " from PoolDBOutputService::lookUpUserLogInfo");
351  return it->second;
352 }
353 
354 void
356  bool withlogging) {
357  std::lock_guard<std::recursive_mutex> lock(m_mutex);
358  // not fully working.. not be used for now...
359  Record & myrecord = lookUpRecord(recordName);
360  cond::persistency::TransactionScope scope( m_session.transaction() );
361 
362  if( myrecord.m_isNewTag ) {
363  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
364  "PoolDBOutputService::closeIOV");
365  }
366  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
367  editor.setEndOfValidity( lastTill );
368  editor.flush();
369  scope.close();
370 }
371 
372 
373 void
375 {
376  cond::UserLogInfo& myloginfo=this->lookUpUserLogInfo(recordName);
377  myloginfo.provenance=dataprovenance;
378  myloginfo.usertext=usertext;
379 }
380 
381 //
382 const cond::Logger&
384  if( !m_logdb.get() ) throw cond::Exception("Log database is not set from PoolDBOutputService::queryLog");
385  return *m_logdb;
386 }
387 
388 // Still required.
389 void
391  //
392  std::lock_guard<std::recursive_mutex> lock(m_mutex);
393  Record& record = lookUpRecord(recordName);
394  result.name=record.m_tag;
395  //use iovproxy to find out.
396  cond::persistency::IOVProxy iov = m_session.readIov( record.m_tag );
397  result.size=iov.sequenceSize();
398  if (result.size>0) {
399  cond::Iov_t last = iov.getLast();
400  result.lastInterval = cond::ValidityInterval( last.since, last.till );
401  result.lastPayloadToken = last.payloadId;
402  }
403 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:22
T getParameter(std::string const &) const
std::string usertext
Definition: LogDBEntry.h:9
T getUntrackedParameter(std::string const &, T const &) const
boost::uint64_t value() const
void closeIOV(Time_t lastTill, const std::string &recordName, bool withlogging=false)
void preEventProcessing(edm::StreamContext const &)
Time_t beginValue
Definition: Time.h:45
JetCorrectorParameters::Record record
Definition: classes.h:7
void fillRecord(edm::ParameterSet &pset)
std::map< std::string, cond::UserLogInfo > m_logheaders
cond::persistency::Session session() const
assert(m_qm.get())
Time_t since
Definition: Types.h:51
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
size_t size
Definition: Types.h:72
std::pair< Time_t, Time_t > ValidityInterval
Definition: Time.h:19
void setDescription(const std::string &description)
Definition: IOVEditor.cc:116
void preGlobalBeginLumi(edm::GlobalContext const &)
void setParameters(const edm::ParameterSet &connectionPset)
std::string tag(const std::string &recordName)
std::string name
Definition: Types.h:68
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
cond::ValidityInterval lastInterval
Definition: Types.h:70
void appendSinceTime(T *payloadObj, cond::Time_t sinceTime, const std::string &recordName, bool withlogging=false)
unsigned long long Time_t
Definition: Time.h:16
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:24
tuple iov
Definition: o2o.py:307
bool isNewTagRequest(const std::string &recordName)
Record & lookUpRecord(const std::string &recordName)
tuple result
Definition: query.py:137
std::string provenance
Definition: LogDBEntry.h:8
Hash payloadId
Definition: Types.h:53
RunNumber_t run() const
void createNewIOV(T *firstPayloadObj, cond::Time_t firstSinceTime, cond::Time_t firstTillTime, const std::string &recordName, bool withlogging=false)
BackendType
Definition: Types.h:23
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
StreamID const & streamID() const
Definition: StreamContext.h:57
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:138
unsigned int value() const
Definition: StreamID.h:46
void throwException(std::string const &message, std::string const &methodName)
Definition: Exception.cc:17
tuple editor
Definition: idDealer.py:73
void tagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::string lastPayloadToken
Definition: Types.h:71
void preGlobalBeginRun(edm::GlobalContext const &)
static constexpr BackendType DEFAULT_DB
Definition: Types.h:24
double a
Definition: hdecay.h:121
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:105
std::map< std::string, Record > m_callbacks
Time_t endValue
Definition: Time.h:46
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::string payloadType() const
Definition: IOVEditor.cc:93
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
TimeValue_t value() const
Definition: Timestamp.h:56
tuple conn
Definition: results_mgr.py:53
const cond::Logger & queryLog() const
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
Timestamp const & timestamp() const
Definition: StreamContext.h:62
Time_t till
Definition: Types.h:52