CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
PoolDBOutputService.cc
Go to the documentation of this file.
11 //
12 #include <vector>
13 #include<memory>
14 #include <cassert>
15 
16 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
17 // of which stream is presently being processed on a given thread during the call of
18 // a module which calls that method.
19 static thread_local int s_streamIndex = -1;
20 
21 void
23  Record thisrecord;
24 
25  thisrecord.m_idName = pset.getParameter<std::string>("record");
26  thisrecord.m_tag = pset.getParameter<std::string>("tag");
27 
28  thisrecord.m_closeIOV =
29  pset.getUntrackedParameter<bool>("closeIOV", m_closeIOV);
30 
32 
33  m_callbacks.insert(std::make_pair(thisrecord.m_idName,thisrecord));
34 
35  cond::UserLogInfo userloginfo;
36  m_logheaders.insert(std::make_pair(thisrecord.m_idName,userloginfo));
37 }
38 
40  m_timetypestr(""),
41  m_currentTimes{},
42  m_session(),
43  m_dbstarted( false ),
44  m_callbacks(),
45  m_closeIOV(false),
46  m_logheaders()
47 {
48  m_closeIOV=iConfig.getUntrackedParameter<bool>("closeIOV",m_closeIOV);
49 
50  m_timetypestr=iConfig.getUntrackedParameter< std::string >("timetype","runnumber");
51  m_timetype = cond::time::timeTypeFromName( m_timetypestr );
52 
53  edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
55  connection.setParameters( connectionPset );
56  connection.configure();
57  std::string connectionString = iConfig.getParameter<std::string>("connect");
58  m_session = connection.createSession( connectionString, true );
59 
60  typedef std::vector< edm::ParameterSet > Parameters;
61  Parameters toPut=iConfig.getParameter<Parameters>("toPut");
62  for(Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
63  fillRecord( *itToPut);
64 
65 
66  iAR.watchPostEndJob(this,&cond::service::PoolDBOutputService::postEndJob);
67  iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
68  m_currentTimes.resize(iBounds.maxNumberOfStreams());
69  });
70  if( m_timetype == cond::timestamp ){ //timestamp
72  iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
73  iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
74  } else if( m_timetype == cond::runnumber ){//runnumber
75  //NOTE: this assumes only one run is being processed at a time.
76  // This is true for 7_1_X but plan are to allow multiple in flight at a time
77  s_streamIndex = 0;
78  iAR.watchPreGlobalBeginRun(this,&cond::service::PoolDBOutputService::preGlobalBeginRun);
79  } else if( m_timetype == cond::lumiid ){
80  //NOTE: this assumes only one lumi is being processed at a time.
81  // This is true for 7_1_X but plan are to allow multiple in flight at a time
82  s_streamIndex = 0;
83  iAR.watchPreGlobalBeginLumi(this,&cond::service::PoolDBOutputService::preGlobalBeginLumi);
84  }
85 }
86 
89  return m_session;
90 }
91 
94  return this->lookUpRecord(recordName).m_tag;
95 }
96 
97 bool
99  Record& myrecord=this->lookUpRecord(recordName);
100  return myrecord.m_isNewTag;
101 }
102 
103 void
105 {
106  std::lock_guard<std::recursive_mutex> lock(m_mutex);
107  cond::persistency::TransactionScope scope( m_session.transaction() );
108  scope.start( false );
109  try{
110  if( !m_session.existsDatabase() ) m_session.createDatabase();
111 
112  } catch( const std::exception& er ){
113  cond::throwException( std::string(er.what()),"PoolDBOutputService::initDB" );
114  }
115  scope.close();
116  m_dbstarted=true;
117 }
118 
119 void
121 {
122  if( m_dbstarted) {
123  m_session.transaction().commit();
124  m_dbstarted = false;
125  }
126 }
127 
128 void
130 {
131  m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
132 }
133 
134 void
136  s_streamIndex = iContext.streamID().value();
137 }
138 
139 void
141  s_streamIndex = -1;
142 }
143 
144 void
146  for( auto& time : m_currentTimes) {
147  time = iContext.luminosityBlockID().run();
148  }
149 }
150 
151 void
153  for( auto& time : m_currentTimes) {
154  time = iContext.luminosityBlockID().value();
155  }
156 }
157 
159  if( m_dbstarted) {
160  m_session.transaction().rollback();
161  }
162 }
163 
165  std::lock_guard<std::recursive_mutex> lock(m_mutex);
166  if (!m_dbstarted) initDB();
167 }
168 
171  return timeTypeSpecs[m_timetype].endValue;
172 }
173 
176  return timeTypeSpecs[m_timetype].beginValue;
177 }
178 
181  assert(-1 != s_streamIndex);
182  return m_currentTimes[s_streamIndex];
183 }
184 
185 void
187  const std::string payloadType,
188  cond::Time_t firstSinceTime,
189  cond::Time_t firstTillTime,
190  const std::string& recordName,
191  bool withlogging){
192  std::lock_guard<std::recursive_mutex> lock(m_mutex);
193 
194  cond::persistency::TransactionScope scope( m_session.transaction() );
195  Record& myrecord=this->lookUpRecord(recordName);
196  if(!myrecord.m_isNewTag) {
197  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
198  }
199  std::string iovToken;
200 
201  try{
202  // FIX ME: synchronization type and description have to be passed as the other parameters?
203  cond::persistency::IOVEditor editor = m_session.createIov( payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY );
204  editor.setDescription( "New Tag" );
205  editor.insert( firstSinceTime, firstPayloadId );
206  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
207  editor.flush( a.usertext );
208  myrecord.m_isNewTag=false;
209  }catch(const std::exception& er){
210  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
211  "PoolDBOutputService::createNewIOV");
212  }
213  scope.close();
214 }
215 
216 void
218  cond::Time_t firstSinceTime,
219  cond::Time_t firstTillTime,
220  const std::string& recordName,
221  bool withlogging){
222  std::lock_guard<std::recursive_mutex> lock(m_mutex);
223  cond::persistency::TransactionScope scope( m_session.transaction() );
224  Record& myrecord=this->lookUpRecord(recordName);
225  if(!myrecord.m_isNewTag) {
226  cond::throwException( myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
227  }
228  std::string iovToken;
230  try{
231  // FIX ME: synchronization type and description have to be passed as the other parameters?
232  cond::persistency::IOVEditor editor = m_session.createIovForPayload( firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY );
233  editor.setDescription( "New Tag" );
234  payloadType = editor.payloadType();
235  editor.insert( firstSinceTime, firstPayloadId );
236  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
237  editor.flush( a.usertext );
238  myrecord.m_isNewTag=false;
239  }catch(const std::exception& er){
240  cond::throwException(std::string(er.what()) + " from PoolDBOutputService::createNewIOV ",
241  "PoolDBOutputService::createNewIOV");
242  }
243  scope.close();
244 }
245 
246 void
248  cond::Time_t time,
249  const std::string& recordName,
250  bool withlogging) {
251  std::lock_guard<std::recursive_mutex> lock(m_mutex);
252  cond::persistency::TransactionScope scope( m_session.transaction() );
253  Record& myrecord=this->lookUpRecord(recordName);
254  if( myrecord.m_isNewTag ) {
255  cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
256  "PoolDBOutputService::appendSinceTime");
257  }
259  try{
260  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
261  payloadType = editor.payloadType();
262  editor.insert( time, payloadId );
263  cond::UserLogInfo a=this->lookUpUserLogInfo(recordName);
264  editor.flush( a.usertext );
265 
266  }catch(const std::exception& er){
268  "PoolDBOutputService::appendSinceTime");
269  }
270  scope.close();
271 }
272 
275  std::lock_guard<std::recursive_mutex> lock(m_mutex);
276  if (!m_dbstarted) this->initDB();
277  cond::persistency::TransactionScope scope( m_session.transaction() );
278  std::map<std::string,Record>::iterator it=m_callbacks.find(recordName);
279  if(it==m_callbacks.end()) {
280  cond::throwException("The record \""+recordName +"\" has not been registered.","PoolDBOutputService::lookUpRecord");
281  }
282  if( !m_session.existsIov( it->second.m_tag) ){
283  it->second.m_isNewTag=true;
284  } else {
285  it->second.m_isNewTag=false;
286  }
287  scope.close();
288  return it->second;
289 }
290 
293  std::map<std::string,cond::UserLogInfo>::iterator it=m_logheaders.find(recordName);
294  if(it==m_logheaders.end()) throw cond::Exception("Log db was not set for record " + recordName + " from PoolDBOutputService::lookUpUserLogInfo");
295  return it->second;
296 }
297 
298 void
300  bool withlogging) {
301  std::lock_guard<std::recursive_mutex> lock(m_mutex);
302  // not fully working.. not be used for now...
303  Record & myrecord = lookUpRecord(recordName);
304  cond::persistency::TransactionScope scope( m_session.transaction() );
305 
306  if( myrecord.m_isNewTag ) {
307  cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
308  "PoolDBOutputService::closeIOV");
309  }
310  cond::persistency::IOVEditor editor = m_session.editIov( myrecord.m_tag );
311  editor.setEndOfValidity( lastTill );
312  editor.flush("Tag closed.");
313  scope.close();
314 }
315 
316 
317 void
319 {
320  cond::UserLogInfo& myloginfo=this->lookUpUserLogInfo(recordName);
321  myloginfo.provenance=dataprovenance;
322  myloginfo.usertext=usertext;
323 }
324 
325 // Still required.
326 void
328  //
329  std::lock_guard<std::recursive_mutex> lock(m_mutex);
330  Record& record = lookUpRecord(recordName);
331  result.name=record.m_tag;
332  //use iovproxy to find out.
333  cond::persistency::IOVProxy iov = m_session.readIov( record.m_tag );
334  result.size=iov.sequenceSize();
335  if (result.size>0) {
336  cond::Iov_t last = iov.getLast();
337  result.lastInterval = cond::ValidityInterval( last.since, last.till );
338  result.lastPayloadToken = last.payloadId;
339  }
340 }
static thread_local int s_streamIndex
const TimeTypeSpecs timeTypeSpecs[]
Definition: Time.cc:22
T getParameter(std::string const &) const
T getUntrackedParameter(std::string const &, T const &) const
boost::uint64_t value() const
Base exception class for the object to relational access.
Definition: Exception.h:11
void closeIOV(Time_t lastTill, const std::string &recordName, bool withlogging=false)
void preEventProcessing(edm::StreamContext const &)
Time_t beginValue
Definition: Time.h:45
JetCorrectorParameters::Record record
Definition: classes.h:7
std::string provenance
Definition: Types.h:23
void fillRecord(edm::ParameterSet &pset)
std::map< std::string, cond::UserLogInfo > m_logheaders
cond::persistency::Session session() const
assert(m_qm.get())
Time_t since
Definition: Types.h:54
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:52
size_t size
Definition: Types.h:75
std::pair< Time_t, Time_t > ValidityInterval
Definition: Time.h:19
void setDescription(const std::string &description)
Definition: IOVEditor.cc:117
tuple recordName
Definition: align_cfg.py:66
void preGlobalBeginLumi(edm::GlobalContext const &)
void throwException(const std::string &message, const std::string &methodName)
Definition: Exception.cc:21
void setParameters(const edm::ParameterSet &connectionPset)
std::string tag(const std::string &recordName)
std::string name
Definition: Types.h:71
tuple result
Definition: mps_fire.py:84
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
cond::ValidityInterval lastInterval
Definition: Types.h:73
void appendSinceTime(T *payloadObj, cond::Time_t sinceTime, const std::string &recordName, bool withlogging=false)
unsigned long long Time_t
Definition: Time.h:16
vector< ParameterSet > Parameters
TimeType timeTypeFromName(const std::string &name)
Definition: Time.cc:24
tuple iov
Definition: o2o.py:307
bool isNewTagRequest(const std::string &recordName)
Record & lookUpRecord(const std::string &recordName)
Hash payloadId
Definition: Types.h:56
RunNumber_t run() const
void start(bool readOnly=true)
Definition: Session.cc:227
void createNewIOV(T *firstPayloadObj, cond::Time_t firstSinceTime, cond::Time_t firstTillTime, const std::string &recordName, bool withlogging=false)
void setLogHeaderForRecord(const std::string &recordName, const std::string &provenance, const std::string &usertext)
StreamID const & streamID() const
Definition: StreamContext.h:57
void insert(cond::Time_t since, const cond::Hash &payloadHash, bool checkType=false)
Definition: IOVEditor.cc:139
string connectionString
Definition: autoCondHLT.py:4
unsigned int value() const
Definition: StreamID.h:46
tuple editor
Definition: idDealer.py:73
void tagInfo(const std::string &recordName, cond::TagInfo_t &result)
std::string lastPayloadToken
Definition: Types.h:74
void preGlobalBeginRun(edm::GlobalContext const &)
std::string usertext
Definition: Types.h:24
double a
Definition: hdecay.h:121
void postModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
void setEndOfValidity(cond::Time_t validity)
Definition: IOVEditor.cc:106
std::map< std::string, Record > m_callbacks
Time_t endValue
Definition: Time.h:46
void preModuleEvent(edm::StreamContext const &, edm::ModuleCallingContext const &)
std::string payloadType() const
Definition: IOVEditor.cc:94
cond::UserLogInfo & lookUpUserLogInfo(const std::string &recordName)
TimeValue_t value() const
Definition: Timestamp.h:56
PoolDBOutputService(const edm::ParameterSet &iConfig, edm::ActivityRegistry &iAR)
Timestamp const & timestamp() const
Definition: StreamContext.h:62
Time_t till
Definition: Types.h:55