CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolDBOutputService.cc
Go to the documentation of this file.
3 //#include "CondCore/DBCommon/interface/TagInfo.h"
4 //#include "CondCore/DBCommon/interface/IOVInfo.h"
13 //
14 #include <vector>
15 #include<memory>
16 #include <cassert>
17 
18 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
19 // of which stream is presently being processed on a given thread during the call of
20 // a module which calls that method.
21 static thread_local int s_streamIndex = -1;
22 
23 void
25  Record thisrecord;
26 
27  thisrecord.m_idName = pset.getParameter<std::string>("record");
28  thisrecord.m_tag = pset.getParameter<std::string>("tag");
29 
30  thisrecord.m_closeIOV =
31  pset.getUntrackedParameter<bool>("closeIOV", m_closeIOV);
32 
33  //thisrecord.m_timetype=cond::findSpecs(pset.getUntrackedParameter< std::string >("timetype",m_timetypestr)).type;
35 
36  m_callbacks.insert(std::make_pair(thisrecord.m_idName,thisrecord));
37 
38  // *** THE LOGGING has still to be defined and implemented.
39  //if( !m_logConnectionString.empty() ){
40  // cond::UserLogInfo userloginfo;
41  // m_logheaders.insert(std::make_pair(thisrecord.m_idName,userloginfo));
42  //}
43 }
44 
46  m_timetypestr(""),
47  m_currentTimes{},
48  m_session(),
49  //m_logConnectionString(""),
50  //m_logdb(),
51  m_dbstarted( false ),
52  m_callbacks(),
53  //m_newtags(),
54  m_closeIOV(false)//,
55  //m_logheaders()
56 {
57  m_closeIOV=iConfig.getUntrackedParameter<bool>("closeIOV",m_closeIOV);
58 
59  m_timetypestr=iConfig.getUntrackedParameter< std::string >("timetype","runnumber");
60  m_timetype = cond::time::timeTypeFromName( m_timetypestr );
61 
62  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
64  connection.setParameters( connectionPset );
65  connection.configure();
66  std::string connectionString = iConfig.getParameter<std::string>("connect");
67  BackendType backType = (BackendType) iConfig.getUntrackedParameter<int>("dbFormat", DEFAULT_DB );
68  if( backType == UNKNOWN_DB ) backType = DEFAULT_DB;
69  m_session = connection.createSession( connectionString, true, backType );
70 
71  //if( iConfig.exists("logconnect") ){
72  // m_logConnectionString = iConfig.getUntrackedParameter<std::string>("logconnect");
73  // cond::DbSession logSession = connection.createSession();
74  // m_logdb.reset( new cond::Logger( logSession ) );
75  //}
76 
77  typedef std::vector< edm::ParameterSet > Parameters;
78  Parameters toPut=iConfig.getParameter<Parameters>("toPut");
79  for(Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
80  fillRecord( *itToPut);
81 
82 
83  iAR.watchPostEndJob(this,&cond::service::PoolDBOutputService::postEndJob);
84  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
85  m_currentTimes.resize(iBounds.maxNumberOfStreams());
86  });
87  if( m_timetype == cond::timestamp ){ //timestamp
89  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
90  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
91  } else if( m_timetype == cond::runnumber ){//runnumber
92  //NOTE: this assumes only one run is being processed at a time.
93  // This is true for 7_1_X but plan are to allow multiple in flight at a time
94  s_streamIndex = 0;
95  iAR.watchPreGlobalBeginRun(this,&cond::service::PoolDBOutputService::preGlobalBeginRun);
96  } else if( m_timetype == cond::lumiid ){
97  //NOTE: this assumes only one lumi is being processed at a time.
98  // This is true for 7_1_X but plan are to allow multiple in flight at a time
99  s_streamIndex = 0;
100  iAR.watchPreGlobalBeginLumi(this,&cond::service::PoolDBOutputService::preGlobalBeginLumi);
101  }
102 }
103 
106  return m_session;
107 }
108 
111  return this->lookUpRecord(recordName).m_tag;
112 }
113 
114 bool
116  Record& myrecord=this->lookUpRecord(recordName);
117  return myrecord.m_isNewTag;
118 }
119 
120 void
122 {
123  std::lock_guard<std::recursive_mutex> lock(m_mutex);
124  m_session.transaction().start(false);
125  cond::persistency::TransactionScope scope( m_session.transaction() );
126  try{
127  if(!forReading) {
128  if( !m_session.existsDatabase() ) m_session.createDatabase();
129  }
130  //init logdb if required
131  //if(!m_logConnectionString.empty()){
132  // m_logdb->connect( m_logConnectionString );
133  // m_logdb->createLogDBIfNonExist();
134  //}
135  } catch( const std::exception& er ){
136  cond::throwException( std::string(er.what()),"PoolDBOutputService::initDB" );
137  }
138  scope.close();
139  m_dbstarted=true;
140 }
141 
142 void
144 {
145  if( m_dbstarted) {
146  m_session.transaction().commit();
147  m_dbstarted = false;
148  }
149 }
150 
151 void
153 {
154  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
155 }
156 
157 void
159  s_streamIndex = iContext.streamID().value();
160 }
161 
162 void
164  s_streamIndex = -1;
165 }
166 
167 void
169  for( auto& time : m_currentTimes) {
170  time = iContext.luminosityBlockID().run();
171  }
172 }
173 
174 void
176  for( auto& time : m_currentTimes) {
177  time = iContext.luminosityBlockID().value();
178  }
179 }
180 
182 }
183 
184 
187  return timeTypeSpecs[m_timetype].endValue;
188 }
189 
192  return timeTypeSpecs[m_timetype].beginValue;
193 }
194 
197  assert(-1 != s_streamIndex);
198  return m_currentTimes[s_streamIndex];
199 }
200 
201 void
203  const std::string payloadType,
204  cond::Time_t firstSinceTime,
205  cond::Time_t firstTillTime,
206  const std::string& recordName,
207  bool withlogging){
208  std::lock_guard<std::recursive_mutex> lock(m_mutex);
209 
210  cond::persistency::TransactionScope scope( m_session.transaction() );
211  Record& myrecord=this->lookUpRecord(recordName);
212  if(!myrecord.m_isNewTag) {
213  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
214  }
215  std::string iovToken;
216  //if(withlogging){
217  // if( m_logConnectionString.empty() ) {
218  // throw cond::db::Exception("Log db was not set from PoolDBOutputService::createNewIOV",
219  // "PoolDBOutputService::createNewIOV");
220  // }
221  //}
222 
223  try{
224  // FIX ME: synchronization type and description have to be passed as the other parameters?
225  cond::persistency::IOVEditor editor = m_session.createIov( payloadType, myrecord.m_tag, myrecord.m_timetype, cond::OFFLINE );
226  editor.setDescription( "New Tag" );
227  editor.insert( firstSinceTime, firstPayloadId );
228  editor.flush();
229  myrecord.m_isNewTag=false;
230  //if(withlogging){
231  // std::string destconnect=m_session.connectionString();
232  // cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
233  // m_logdb->logOperationNow(a,destconnect,objClass,objToken,myrecord.m_tag,myrecord.timetypestr(),payloadIdx,firstSinceTime);
234  //}
235  }catch(const std::exception& er){
236  //if(withlogging){
237  // std::string destconnect=m_session.connectionString();
238  // cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
239  // m_logdb->logFailedOperationNow(a,destconnect,objClass,objToken,myrecord.m_tag,myrecord.timetypestr(),payloadIdx,firstSinceTime,std::string(er.what()));
240  //}
241  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
242  "PoolDBOutputService::createNewIOV");
243  }
244  scope.close();
245 }
246 
247 void
249  cond::Time_t firstSinceTime,
250  cond::Time_t firstTillTime,
251  const std::string& recordName,
252  bool withlogging){
253  std::lock_guard<std::recursive_mutex> lock(m_mutex);
254  cond::persistency::TransactionScope scope( m_session.transaction() );
255  Record& myrecord=this->lookUpRecord(recordName);
256  if(!myrecord.m_isNewTag) {
257  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
258  }
259  std::string iovToken;
260  try{
261  // FIX ME: synchronization type and description have to be passed as the other parameters?
262  cond::persistency::IOVEditor editor = m_session.createIovForPayload( firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::OFFLINE );
263  editor.setDescription( "New Tag" );
264  editor.insert( firstSinceTime, firstPayloadId );
265  editor.flush();
266  myrecord.m_isNewTag=false;
267  }catch(const std::exception& er){
268  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
269  "PoolDBOutputService::createNewIOV");
270  }
271  scope.close();
272 }
273 
274 void
277  const std::string& recordName,
278  bool withlogging) {
279  std::lock_guard<std::recursive_mutex> lock(m_mutex);
280  cond::persistency::TransactionScope scope( m_session.transaction() );
281  Record& myrecord=this->lookUpRecord(recordName);
282  if( myrecord.m_isNewTag ) {
283  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
284  "PoolDBOutputService::appendSinceTime");
285  }
286  //if(withlogging){
287  // if( m_logConnectionString.empty() ) {
288  // throw cond::Exception("Log db was not set from PoolDBOutputService::add");
289  // }
290  //}
291 
292  try{
293  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
294  editor.insert( time, payloadId );
295  editor.flush();
296 
297  //if(withlogging){
298  // std::string destconnect=m_session.connectionString();
299  // cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
300  // m_logdb->logOperationNow(a,destconnect,objClass,objToken,myrecord.m_tag,myrecord.timetypestr(),payloadIdx,time);
301  //}
302  }catch(const std::exception& er){
303  //if(withlogging){
304  // std::string destconnect=m_session.connectionString();
305  // cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
306  // m_logdb->logFailedOperationNow(a,destconnect,objClass,objToken,myrecord.m_tag,myrecord.timetypestr(),payloadIdx,time,std::string(er.what()));
307  //}
309  "PoolDBOutputService::appendSinceTime");
310  }
311  scope.close();
312 }
313 
316  std::lock_guard<std::recursive_mutex> lock(m_mutex);
317  if (!m_dbstarted) this->initDB( false );
318  cond::persistency::TransactionScope scope( m_session.transaction() );
319  std::map<std::string,Record>::iterator it=m_callbacks.find(recordName);
320  if(it==m_callbacks.end()) {
321  cond::throwException("The record \""+recordName +"\" has not been registered.","PoolDBOutputService::lookUpRecord");
322  }
323  if( !m_session.existsIov( it->second.m_tag) ){
324  it->second.m_isNewTag=true;
325  } else {
326  it->second.m_isNewTag=false;
327  }
328  scope.close();
329  return it->second;
330 }
331 
332 //cond::UserLogInfo&
333 //cond::service::PoolDBOutputService::lookUpUserLogInfo(const std::string& recordName){
334 // std::map<std::string,cond::UserLogInfo>::iterator it=m_logheaders.find(recordName);
335 // if(it==m_logheaders.end()) throw cond::Exception("Log db was not set for record " + recordName + " from PoolDBOutputService::lookUpUserLogInfo");
336 // return it->second;
337 //}
338 
339 void
341  bool withlogging) {
342  std::lock_guard<std::recursive_mutex> lock(m_mutex);
343  // not fully working.. not be used for now...
344  Record & myrecord = lookUpRecord(recordName);
345  cond::persistency::TransactionScope scope( m_session.transaction() );
346 
347  if( myrecord.m_isNewTag ) {
348  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
349  "PoolDBOutputService::closeIOV");
350  }
351  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
352  editor.setEndOfValidity( lastTill );
353  editor.flush();
354  scope.close();
355 }
356 
357 
358 void
360 {
361  //cond::UserLogInfo& myloginfo=this->lookUpUserLogInfo(recordName);
362  //myloginfo.provenance=dataprovenance;
363  //myloginfo.usertext=usertext;
364 }
365 
366 //
367 //const cond::Logger&
368 //cond::service::PoolDBOutputService::queryLog()const{
369 // if( !m_logdb.get() ) throw cond::Exception("Log database is not set from PoolDBOutputService::queryLog");
370 // return *m_logdb;
371 //}
372 
373 // Still required.
374 void
376  //
377  std::lock_guard<std::recursive_mutex> lock(m_mutex);
378  Record& record = lookUpRecord(recordName);
379  result.name=record.m_tag;
380  //use iovproxy to find out.
381  cond::persistency::IOVProxy iov = m_session.readIov( record.m_tag );
382  result.size=iov.sequenceSize();
383  if (result.size>0) {
384  cond::Iov_t last = iov.getLast();
385  result.lastInterval = cond::ValidityInterval( last.since, last.till );
386  result.lastPayloadToken = last.payloadId;
387  }
388 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:22
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
void closeIOV(Time_t lastTill, const std::string &recordName, bool withlogging=false)
void preEventProcessing(edm::StreamContext const &)
Time_t beginValue
Definition: Time.h:45
JetCorrectorParameters::Record record
Definition: classes.h:7
void fillRecord(edm::ParameterSet &pset)
cond::persistency::Session session() const
Time_t since
Definition: Types.h:51
boost::uint64_t value() const
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
size_t size
Definition: Types.h:72
std::pair< Time_t, Time_t > ValidityInterval
Definition: Time.h:19
void setDescription(const std::string &description)
Definition: IOVEditor.cc:113
void preGlobalBeginLumi(edm::GlobalContext const &)
void setParameters(const edm::ParameterSet &connectionPset)
std::string tag(const std::string &recordName)
std::string name
Definition: Types.h:68
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
cond::ValidityInterval lastInterval
Definition: Types.h:70
void appendSinceTime(T *payloadObj, cond::Time_t sinceTime, const std::string &recordName, bool withlogging=false)
unsigned long long Time_t
Definition: Time.h:16
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:24
tuple iov
Definition: o2o.py:307
bool isNewTagRequest(const std::string &recordName)
Record & lookUpRecord(const std::string &recordName)
tuple result
Definition: query.py:137
Hash payloadId
Definition: Types.h:53
RunNumber_t run() const
void createNewIOV(T *firstPayloadObj, cond::Time_t firstSinceTime, cond::Time_t firstTillTime, const std::string &recordName, bool withlogging=false)
BackendType
Definition: Types.h:23
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
StreamID const & streamID() const
Definition: StreamContext.h:57
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:135
unsigned int value() const
Definition: StreamID.h:46
void throwException(std::string const &message, std::string const &methodName)
Definition: Exception.cc:17
tuple editor
Definition: idDealer.py:73
void tagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::string lastPayloadToken
Definition: Types.h:71
void preGlobalBeginRun(edm::GlobalContext const &)
static constexpr BackendType DEFAULT_DB
Definition: Types.h:24
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:102
std::map< std::string, Record > m_callbacks
Time_t endValue
Definition: Time.h:46
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
TimeValue_t value() const
Definition: Timestamp.h:56
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
Timestamp const & timestamp() const
Definition: StreamContext.h:62
Time_t till
Definition: Types.h:52