CMS 3D CMS Logo

MatacqProducer.cc
Go to the documentation of this file.
12 #include <boost/algorithm/string.hpp>
13 #include <boost/format.hpp>
14 
15 #include <iostream>
16 #include <memory>
17 
18 #include <cstdio>
19 
20 #include <fstream>
21 #include <iomanip>
22 
24 
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <csignal>
28 #include <sys/stat.h>
29 #include <glob.h>
30 
31 using namespace std;
32 using namespace boost;
33 using namespace edm;
34 
35 // #undef LogInfo
36 // #define LogInfo(a) cout << "INFO " << a << ": "
37 // #undef LogWarning
38 // #define LogWarning(a) cout << "WARN " << a << ": "
39 // #undef LogDebug
40 // #define LogDebug(a) cout << "DBG " << a << ": "
41 
42 
43 //verbose mode for matacq event retrieval debugging:
44 //static const bool searchDbg = false;
45 
46 //laser freq is 1 every 112 orbit => >80 orbit
47 const int MatacqProducer::orbitTolerance_ = 80;
48 
50 
51 static std::string now(){
52  struct timeval t;
53  gettimeofday(&t, nullptr);
54 
55  char buf[256];
56  strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
57  buf[sizeof(buf)-1] = 0;
58 
59  stringstream buf2;
60  buf2 << buf << " " << ((t.tv_usec+500)/1000) << " ms";
61 
62  return buf2.str();
63 }
64 
65 
67  fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
68  digiInstanceName_(params.getParameter<string>("digiInstanceName")),
69  rawInstanceName_(params.getParameter<string>("rawInstanceName")),
70  timing_(params.getUntrackedParameter<bool>("timing", false)),
71  disabled_(params.getParameter<bool>("disabled")),
72  verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
73  produceDigis_(params.getParameter<bool>("produceDigis")),
74  produceRaw_(params.getParameter<bool>("produceRaw")),
75  inputRawCollection_(params.getParameter<edm::InputTag>("inputRawCollection")),
76  mergeRaw_(params.getParameter<bool>("mergeRaw")),
77  ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
78  matacq_(nullptr, 0),
79  inFile_(nullptr),
80  data_(bufferSize),
81  openedFileRunNumber_(0),
82  lastOrb_(0),
83  fastRetrievalThresh_(0),
84  orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile",
85  "")),
86  inFileName_(""),
87  stats_(stats_init),
88  logFileName_(params.getUntrackedParameter<std::string>("logFileName",
89  "matacqProducer.log")),
90  eventSkipCounter_(0),
91  onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
92  timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
93  runNumber_(0)
94 {
95  if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer ctor" << endl;
96 
97  gettimeofday(&timer_, nullptr);
98 
99  if(!timeLogFile_.empty()){
100  timeLog_.open(timeLogFile_.c_str());
101  if(timeLog_.fail()){
102  cout << "[LaserSorter " << now() << "] "
103  << "Failed to open file " << timeLogFile_ << " to log timing.\n";
104  logTiming_ = false;
105  } else{
106  logTiming_ = true;
107  }
108  }
109 
111 
112  logFile_.open(logFileName_.c_str(), ios::app | ios::out);
113 
114  if(logFile_.bad()){
115  throw cms::Exception("FileOpen") << "Failed to open file "
116  << logFileName_ << " for logging.\n";
117  }
118 
119  inputRawCollectionToken_ = consumes<FEDRawDataCollection>(params.getParameter<InputTag>("inputRawCollection"));
120 
121 
122  if(produceDigis_){
123  if(verbosity_>0) cout << "[Matacq " << now() << "] registering new "
124  "EcalMatacqDigiCollection product with instance name '"
125  << digiInstanceName_ << "'\n";
126  produces<EcalMatacqDigiCollection>(digiInstanceName_);
127  }
128 
129  if(produceRaw_){
130  if(verbosity_>0) cout << "[Matacq " << now() << "] registering new FEDRawDataCollection "
131  "product with instance name '"
132  << rawInstanceName_ << "'\n";
133  produces<FEDRawDataCollection>(rawInstanceName_);
134  }
135 
136  startTime_.tv_sec = startTime_.tv_usec = 0;
137  if(!orbitOffsetFile_.empty()){
138  doOrbitOffset_ = true;
139  loadOrbitOffset();
140  } else{
141  doOrbitOffset_ = false;
142  }
143  if(verbosity_>=4) cout << "[Matacq " << now() << "] exiting MatacqProducer ctor" << endl;
144 }
145 
146 
147 void
149  if(verbosity_>=4) cout << "[Matacq " << now() << "] in MatacqProducer::produce" << endl;
150  if(logTiming_){
151  timeval t;
152  gettimeofday(&t, nullptr);
153 
154  timeLog_ << t.tv_sec << "."
155  << setfill('0') << setw(3) << (t.tv_usec+500)/1000 << setfill(' ')<< "\t"
156  << (t.tv_usec - timer_.tv_usec)*1.
157  + (t.tv_sec - timer_.tv_sec)*1.e6 << "\t";
158  timer_ = t;
159  }
160 
161  if(startTime_.tv_sec==0) gettimeofday(&startTime_, nullptr);
162  ++stats_.nEvents;
163  if(disabled_) return;
164  const uint32_t runNumber = getRunNumber(event);
165  if(runNumber!=runNumber_){
166  newRun(runNumber_, runNumber);
167  }
168  addMatacqData(event);
169 
170  if(logTiming_){
171  timeval t;
172  gettimeofday(&t, nullptr);
173  timeLog_ << (t.tv_usec - timer_.tv_usec)*1.
174  + (t.tv_sec - timer_.tv_sec)*1.e6 << "\n";
175  timer_ = t;
176  }
177 }
178 
179 void
181 
183  event.getByToken(inputRawCollectionToken_, sourceColl);
184 
185  std::unique_ptr<FEDRawDataCollection> rawColl;
186  if(produceRaw_){
187  if(mergeRaw_){
188  rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
189  } else{
190  rawColl = std::make_unique<FEDRawDataCollection>();
191  }
192  }
193 
194  auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
195 
196  if(eventSkipCounter_==0){
197  if(sourceColl->FEDData(matacqFedId_).size()>4 && !produceRaw_){
198  //input raw data collection already contains matacqData
200  *digiColl);
201  } else{
202  bool isLaserEvent = (getCalibTriggerType(event) == laserType);
203 
204 
205  // cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
206 
207  if(isLaserEvent || ignoreTriggerType_){
208 
209  const uint32_t runNumber = getRunNumber(event);
210  const uint32_t orbitId = getOrbitId(event);
211 
212  LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
213 
214  bool fileChange;
215  if(doOrbitOffset_){
216  map<uint32_t,uint32_t>::iterator it = orbitOffset_.find(runNumber);
217  if(it == orbitOffset_.end()){
218  LogWarning("Matacq") << "Orbit offset not found for run "
219  << runNumber
220  << ". No orbit correction will be applied.";
221  }
222  }
223 
224  if(getMatacqFile(runNumber, orbitId, &fileChange)){
225  //matacq file retrieval succeeded
226  LogInfo("Matacq") << "Matacq data file found for "
227  << "run " << runNumber << " orbit " << orbitId;
228  if(getMatacqEvent(runNumber, orbitId, fileChange)){
229  if(produceDigis_){
230  formatter_.interpretRawData(matacq_, *digiColl);
231  }
232  if(produceRaw_){
233  uint32_t dataLen64 = matacq_.getParsedLen();
234  if(dataLen64 > bufferSize*8 || matacq_.getDccLen()!= dataLen64){
235  LogWarning("Matacq") << " Error in Matacq event fragment length! "
236  << "DCC len: " << matacq_.getDccLen()
237  << "*8 Bytes, Parsed len: "
238  << matacq_.getParsedLen() << "*8 Bytes. "
239  << "Matacq data will not be included for this event.\n";
240  } else{
241  rawColl->FEDData(matacqFedId_).resize(dataLen64*8);
242  copy(data_.begin(), data_.begin() + dataLen64*8,
243  rawColl->FEDData(matacqFedId_).data());
244  }
245  }
246  LogInfo("Matacq") << "Associating matacq data with orbit id "
247  << matacq_.getOrbitId()
248  << " to dcc event with orbit id "
249  << orbitId << std::endl;
250  if(isLaserEvent){
252  } else{
254  }
255  } else{
256  if(isLaserEvent){
257  LogWarning("Matacq") << "No matacq data found for laser event "
258  << "of run " << runNumber << " orbit "
259  << orbitId;
260  }
261  }
262  } else{
263  LogWarning("Matacq") << "No matacq file found for event "
264  << event.id();
265  }
266  }
267  }
268  if(eventSkipCounter_>0){ //error occured for this events
269  // and some events will be skipped following
270  // to this error.
271  LogInfo("Matacq") << " [" << now() << "] "
273  << " next events will be skipped, following to an "
274  << "error on the last processed event, "
275  << "which is expected to be persistant.";
276  }
277  } else{
279  }
280 
281  if(produceRaw_){
282  if(verbosity_>1) cout << "[Matacq " << now() << "] "
283  << "Adding FEDRawDataCollection collection "
284  << " to event.\n";
285  event.put(std::move(rawColl), rawInstanceName_);
286  }
287 
288  if(produceDigis_){
289  if(verbosity_>1) cout << "[Matacq " << now() << "] "
290  << "Adding EcalMatacqDigiCollection collection "
291  << " to event.\n";
292  event.put(std::move(digiColl), digiInstanceName_);
293  }
294 }
295 
296 // #if 0
297 // bool
298 // MatacqProducer::getMatacqEvent(std::ifstream& f,
299 // uint32_t runNumber,
300 // uint32_t orbitId,
301 // bool doWrap,
302 // std::streamoff maxPos){
303 // bool found = false;
304 // streampos startPos = f.tellg();
305 
306 // while(!f.eof()
307 // && !found
308 // && (maxPos<0 || f.tellg()<=maxPos)){
309 // const streamsize headerSize = 8*8;
310 // f.read((char*)&data_[0], headerSize);
311 // if(f.eof()) break;
312 // int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
313 // uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
314 // uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
315 // // cout << "Matacq: orbit = " << orb
316 // // << " len = " << len
317 // // << " run = " << run << endl;
318 // if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
319 // && (runNumber==0 || runNumber==run)){
320 // found = true;
321 // //reads the rest of the event:
322 // if(data_.size() < len*8){
323 // throw cms::Exception("Matacq") << "Buffer overflow";
324 // }
325 // f.read((char*)&data_[0]+headerSize, len*8-headerSize);
326 // matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
327 // } else{
328 // //moves to next event:
329 // f.seekg(len*8 - headerSize, ios::cur);
330 // }
331 // }
332 
333 // f.clear(); //clears eof error to allow seekg
334 // if(doWrap && !found){
335 // f.seekg(0, ios::beg);
336 // found = getMatacqEvent(f, runNumber, orbitId, false, startPos);
337 // }
338 // return found;
339 // }
340 //#endif
341 
342 bool
344  int32_t orbitId,
345  bool fileChange){
346  filepos_t startPos;
347  if(!mtell(startPos)) return false;
348 
349  int32_t startOrb = -1;
350  const size_t headerSize = 8*8;
351  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
352  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
353  if(startOrb<0) startOrb = 0;
354  } else{
355  if(verbosity_>2){
356  cout << "[Matacq " << now() << "] Failed to read matacq header. Moved to start of "
357  " the file.\n";
358  }
359  mrewind();
360  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
361  startPos = 0;
362  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
363  } else{
364  if(verbosity_>2) cout << "[Matacq " << now() << "] Looks like matacq file is empty"
365  << "\n";
366  return false;
367  }
368  }
369 
370  if(verbosity_>2) cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_
371  << " looking for orbit " << orbitId
372  << ". Current file position: " << startPos
373  << " Orbit at current position: " << startOrb << "\n";
374 
375  // f.clear();
376  bool didCoarseMove = false;
377 
378  //FIXME: case where posEtim_.invalid() is false
379  if(!posEstim_.invalid()
380  && (abs(lastOrb_-orbitId) > fastRetrievalThresh_)){
381  filepos_t pos = posEstim_.pos(orbitId);
382 
383  // struct stat st;
384  filepos_t fsize;
385  // if(0==stat(inFileName_.c_str(), &st)){
386  if(msize(fsize)){
387  // const int64_t fsize = st.st_size;
388  if(0!=posEstim_.eventLength() && pos > fsize){
389  //estimated position is beyong end of file
390  //-> move to beginning of last event:
391  int64_t evtSize = posEstim_.eventLength()*sizeof(uint64_t);
392  pos = ((int64_t)fsize/evtSize-1)*evtSize;
393  if(verbosity_>2){
394  cout << "[Matacq " << now() << "] Estimated position was beyond end of file. "
395  "Changed to " << pos << "\n";
396  }
397  }
398  } else{
399  LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
400  }
401  if(pos>=0){
402  if(verbosity_>2) cout << "[Matacq " << now() << "] jumping to estimated position "
403  << pos << "\n";
404  mseek(pos, SEEK_SET, "Jumping to estimated event position");
405  if(mread((char*)&data_[0], headerSize, "Reading matacq header", true)){
406  didCoarseMove = true;
407  } else{
408  //estimated position might have been beyond the end of the file,
409  //try, with original position:
410  didCoarseMove = false;
411  if(!mread((char*)&data_[0], headerSize, "Reading event header", true)){
412  return false;
413  }
414  }
415  } else{
416  if(verbosity_) cout << "[Matacq " << now() << "] Event orbit outside of orbit range "
417  "of matacq data file events\n";
418  return false;
419  }
420  }
421 
422  int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
423 
424  if(didCoarseMove){
425  //autoadjustement of threshold for coarse move:
426  if(abs(orb-orbitId) > fastRetrievalThresh_){
427  if(verbosity_>2) cout << "[Matacq " << now() << "] Fast retrieval threshold increased from "
429  fastRetrievalThresh_ = 2*abs(orb-orbitId);
430  if(verbosity_>2) cout << " to " << fastRetrievalThresh_ << "\n";
431  }
432 
433  //if coarse move did not improve situation, rolls back:
434  if(startOrb > 0
435  && (abs(orb-orbitId) > abs(startOrb-orbitId))){
436  if(verbosity_>2) cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb << ") "
437  "was worst than original position (-> orbit "
438  << startOrb
439  << "). Restoring position (" << startPos << ").\n";
440  mseek(startPos, SEEK_SET);
441  mread((char*)&data_[0], headerSize, "Reading event header", true);
442  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
443  }
444  }
445 
446  bool searchBackward = (orb>orbitId)?true:false;
447  //BEWARE: len must be signed, because we are using latter in the code (-len)
448  //expression
449  int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
450 
451  if(len==0){
452  cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
453  << " and move matacq file pointer to beginning of the file. "
454  << "(" << __FILE__ << ":" << __LINE__ << ")."
455  << "\n";
456  //rewind(f);
457  mrewind();
458  return false;
459  }
460 
461  enum state_t { searching, found, failed } state = searching;
462 
463  while(state == searching){
464  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
465  len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
466  uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
467  if(verbosity_>3){
468  filepos_t pos = -1;
469  mtell(pos);
470  cout << "[Matacq " << now() << "] Header read at file position "
471  << pos
472  << ": orbit = " << orb
473  << " len = " << len << "x8 Byte"
474  << " run = " << run << "\n";
475  }
476  if((abs(orb-orbitId) < orbitTolerance_)
477  && (runNumber==0 || runNumber==run)){
478  state = found;
479  lastOrb_ = orb;
480  //reads the rest of the event:
481  if((int)data_.size() < len*8){
482  throw cms::Exception("Matacq") << "Buffer overflow";
483  }
484  if(verbosity_>2) cout << "[Matacq " << now() << "] Event found. Reading "
485  " matacq event." << "\n";
486  if(!mread((char*)&data_[0], len*8, "Reading matacq event")){
487  if(verbosity_>2) cout << "[Matacq " << now() << "] Failed to read matacq event."
488  << "\n";
489  state = failed;
490  }
491  matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
492  } else {
493  if((searchBackward && (orb < orbitId))
494  || (!searchBackward && (orb > orbitId))){ //search ended
495  lastOrb_ = orb;
496  state = failed;
497  if(verbosity_>2) cout << "[Matacq " << now()
498  << "] No matacq data found for run " << run
499  << ", orbit ID " << orbitId << "." << "\n";
500  } else{
501  off_t offset = (searchBackward?-len:len)*8;
502  lastOrb_ = orb;
503  if(verbosity_>3){
504  cout << "[Matacq " << now() << "] In matacq file, moving "
505  << abs(offset) << " byte " << (offset>0?"forward":"backward")
506  << ".\n";
507  }
508 
509  if(mseek(offset, SEEK_CUR,
510  (searchBackward?"Moving to previous event":
511  "Moving to next event"))
512  && mread((char*)&data_[0], headerSize, "Reading event header",
513  true)){
514  } else{
515  if(!searchBackward) mseek(-len*8, SEEK_CUR,
516  "Moving to start of last complete event");
517  state = failed;
518  }
519  }
520  }
521  }
522 
523  if(state==found){
524  filepos_t pos = -1;
525  filepos_t fsize = -1;
526  mtell(pos);
527  msize(fsize);
528  if(pos==fsize-1){ //last byte.
529  if(verbosity_>2){
530  cout << "[Matacq " << now() << "] Event found was at the end of the file. Moving "
531  "stream position to beginning of this event."
532  << "\n";
533  }
534  mseek(-(int)len*8-1, SEEK_CUR,
535  "Moving to beginning of last matacq event");
536  }
537  }
538  return (state==found);
539 }
540 
541 
542 bool
543 MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId,
544  bool* fileChange){
545  if(openedFileRunNumber_ != 0
546  && openedFileRunNumber_ == runNumber){
547  uint32_t firstOrb, lastOrb;
548  bool goodRange = getOrbitRange(firstOrb, lastOrb);
549  // if(orbitId < firstOrb || orbitId > lastOrb) continue;
550  if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
551  if(fileChange!=nullptr) *fileChange = false;
552  return misOpened();
553  }
554  }
555 
556  if(fileNames_.empty()) return false;
557 
558  const string runNumberFormat = "%08d{,_*}";
559  string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
560  //cout << "Run number string: " << sRunNumber << "\n";
561  bool found = false;
562  string fname;
563  uint32_t maxOrb = 0;
564  //we make two iterations to handle the case where the event is procesed
565  //before the matacq data are available. In such case we would have
566  //orbitId > maxOrb (maxOrb: orbit of last written matacq event)
567  for(int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry){
568  if(itry > 0){
569  int n_sec = 1;
570  std::cout << "[Matacq " << now() << "] Event orbit id (" << orbitId << ") goes "
571  "beyound the range of available one. Waiting for " << n_sec << " seconds in case "
572  "it was not written yet to disk.";
573  sleep(n_sec);
574  }
575 
576  for(unsigned i=0; i < fileNames_.size() && !found; ++i){
577  fname = fileNames_[i];
578  boost::algorithm::replace_all(fname, "%run_subdir%",
579  runSubDir(runNumber));
580  boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
581 
582  glob_t g;
583  int rc = glob(fname.c_str(), GLOB_BRACE, nullptr, &g);
584  if(rc){
585  if(verbosity_ > 1){
586  switch(rc){
587  case GLOB_NOSPACE:
588  std::cout << "[Matacq " << now() << "] Running out of memory while calling glob function to look for matacq file paths\n";
589  break;
590  case GLOB_ABORTED:
591  std::cout << "[Matacq " << now() << "] Read error while calling glob function to look for matacq file paths\n";
592  break;
593  case GLOB_NOMATCH:
594  //ok. No message to report.
595  break;
596  }
597  continue;
598  }
599  } //rc
600  for(unsigned iglob = 0; iglob < g.gl_pathc; ++iglob){
601  char* thePath = g.gl_pathv[iglob];
602  //FIXME: add sanity check on the path
603  static std::atomic<int> nOpenErrors {0};
604  const int maxOpenErrors = 50;
605  if(!mopen(thePath) && nOpenErrors < maxOpenErrors){
606  std::cout << "[Matacq " << now() << "] Failed to open file " << thePath;
607  ++nOpenErrors;
608  if(nOpenErrors == maxOpenErrors){
609  std::cout << nOpenErrors << "This is the " << maxOpenErrors
610  << "th occurence of this error. Report of this error is now disabled.\n";
611  } else{
612  std::cout << "\n";
613  }
614  }
615  uint32_t firstOrb;
616  uint32_t lastOrb;
617  bool goodRange = getOrbitRange(firstOrb, lastOrb);
618  if(goodRange && lastOrb > maxOrb) maxOrb = lastOrb;
619  if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
620  found = true;
621  //continue;
622  fname = thePath;
623  if(verbosity_ > 1) std::cout << "[Matacq " << now() << "] Switching to file " << fname << "\n";
624  break;
625  }
626  } //next iglob
627  globfree(&g);
628  }//next filenames
629  } //next itry
630 
631  if(found){
632  LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
633  } else{
634  if(verbosity_>=0) cout << "[Matacq " << now() << "] no matacq file found "
635  "for run " << runNumber << "\n";
638  if(fileChange!=nullptr) *fileChange = false;
639  return false;
640  }
641 
642  if(found){
644  lastOrb_ = 0;
645  posEstim_.init(this);
646  if(fileChange!=nullptr) *fileChange = true;
647  return true;
648  } else{
649  return false;
650  }
651 }
652 
653 
655  return ev.run();
656 }
657 
659  //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
660  //we could use here. The code would be shorten to:
661  //return ev.orbitNumber();
662  //we have to deal with what we have in current CMSSW releases:
665  if(!(rawdata.isValid())){
666  throw cms::Exception("NotFound")
667  << "No FED raw data collection found. ECAL raw data are "
668  "required to retrieve the orbit ID";
669  }
670 
671  int orbit = 0;
672  for(int id=601; id<=654; ++id){
673  if(!FEDNumbering::inRange(id)) continue;
674  const FEDRawData& data = rawdata->FEDData(id);
675  const int orbitIdOffset64 = 3;
676  if(data.size()>=8*(orbitIdOffset64+1)){//orbit id is in 4th 64-bit word
677  const unsigned char* pOrbit = data.data() + orbitIdOffset64*8;
678  int thisOrbit = pOrbit[0]
679  | (pOrbit[1] <<8)
680  | (pOrbit[2] <<16)
681  | (pOrbit[3] <<24);
682  if(orbit!=0 && thisOrbit!=0 && abs(orbit-thisOrbit)>orbitTolerance_){
683  //throw cms::Exception("EventCorruption")
684  // << "Orbit ID inconsitency in DCC headers";
685  LogWarning("EventCorruption")
686  << "Orbit ID inconsitency in DCC headers";
687  orbit = 0;
688  break;
689  }
690  if(thisOrbit!=0) orbit = thisOrbit;
691  }
692  }
693 
694  if(orbit==0){
695  // throw cms::Exception("NotFound")
696  // << "Failed to retrieve orbit ID of event "<< ev.id();
697  LogWarning("NotFound") << "Failed to retrieve orbit ID of event "
698  << ev.id();
699  }
700  return orbit;
701 }
702 
706  if(!(rawdata.isValid())){
707  throw cms::Exception("NotFound")
708  << "No FED raw data collection found. ECAL raw data are "
709  "required to retrieve the trigger type";
710  }
711 
713  for(int id=601; id<=654; ++id){
714  if(!FEDNumbering::inRange(id)) continue;
715  const FEDRawData& data = rawdata->FEDData(id);
716  const int detailedTrigger32 = 5;
717  if(data.size()>=4*(detailedTrigger32+1)){
718  const unsigned char* pTType = data.data() + detailedTrigger32*4;
719  int tType = pTType[1] & 0x7;
720  stat.add(tType);
721  }
722  }
723  double p;
724  int tType = stat.result(&p);
725  if(p<0){
726  //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
727  LogWarning("NotFound") << "No ECAL DCC data found\n";
728  tType = -1;
729  }
730  if(p<.8){
731  //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
732  LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
733  tType = -1;
734  }
735  return tType;
736 }
737 
739  mp->mrewind();
740 
741  const size_t headerSize = 8*8;
742  unsigned char data[headerSize];
743  if(!mp->mread((char*)data, headerSize)){
744  if(verbosity_) cout << "[Matacq " << now() << "] reached end of file!\n";
745  firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
746  return;
747  } else{
748  firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
749  eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
750  if(verbosity_>1) cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_
751  << " event length: " << eventLength_
752  << "*8 byte\n";
753  }
754 
755  mp->mrewind();
756 
757  if(eventLength_==0){
758  if(verbosity_) cout << "[Matacq " << now() << "] event length is null!" << endl;
759  return;
760  }
761 
762  filepos_t s;
763  mp->msize(s);
764 
765  //number of complete events:
766  const unsigned nEvents = s/eventLength_/8;
767 
768  if(nEvents==0){
769  if(verbosity_) cout << "[Matacq " << now() << "] File is empty!" << endl;
770  orbitStepMean_ = 0;
771  return;
772  }
773 
774  if(verbosity_>1) cout << "[Matacq " << now() << "] File size: " << s
775  << " Number of events: " << nEvents << endl;
776 
777  //position of last complete events:
778  off_t last = (nEvents-1)*(off_t)eventLength_*8;
779  mp->mseek(last, SEEK_SET, "Moving to beginning of last complete "
780  "matacq event");
781  if(!mp->mread((char*) data, headerSize, "Reading matacq header", true)){
782  LogWarning("Matacq") << "Fast matacq event retrieval failure. "
783  "Falling back to safe retrieval mode.";
784  orbitStepMean_ = 0;
785  }
786 
787  int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
788  int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
789 
790  if(verbosity_>1) cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb
791  << " last event length: " << lastLen << endl;
792 
793  //some consistency check
794  if(lastLen!=eventLength_){
795  LogWarning("Matacq")
796  //throw cms::Exception("Matacq")
797  << "Fast matacq event retrieval failure: it looks like "
798  "the matacq file contains events of different sizes.";
799  // " Falling back to safe retrieval mode.";
800  invalid_ = false; //true;
801  orbitStepMean_ = 112; //0;
802  return;
803  }
804 
805  orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
806 
807  if(verbosity_>1) cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_
808  << "\n";
809 
810  invalid_ = false;
811 }
812 
813 int64_t MatacqProducer::PosEstimator::pos(int orb) const{
814  if(orb<firstOrbit_) return -1;
815  uint64_t r = orbitStepMean_!=0?
816  (((uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
817  :0;
818  if(verbosity_>2) cout << "[Matacq " << now() << "] Estimated Position for orbit " << orb
819  << ": " << r << endl;
820  return r;
821 }
822 
824  mclose();
825  timeval t;
826  gettimeofday(&t, nullptr);
827  if(logTiming_ && startTime_.tv_sec!=0){
828  //not using logger, to allow timing with different logging options
829  cout << "[Matacq " << now() << "] Time elapsed between first event and "
830  "destruction of MatacqProducer: "
831  << ((t.tv_sec-startTime_.tv_sec)*1.
832  + (t.tv_usec-startTime_.tv_usec)*1.e-6) << "s\n";
833  }
834 }
835 
837  std::ifstream f(orbitOffsetFile_.c_str());
838  if(f.bad()){
839  throw cms::Exception("Matacq")
840  << "Failed to open orbit ID correction file '"
841  << orbitOffsetFile_ << "'\n";
842  }
843 
844  cout << "[Matacq " << now() << "] "
845  << "Offset to substract to Matacq events Orbit ID: \n"
846  << "#Run Number\t Offset\n";
847 
848  int iline = 0;
849  string s;
850  stringstream buf;
851  while(f.eof()){
852  getline(f, s);
853  ++iline;
854  if(s[0]=='#'){//comment
855  //skip line:
856  f.ignore(numeric_limits<streamsize>::max(), '\n');
857  continue;
858  }
859  buf.str("");
860  buf << s;
861  int run;
862  int orbit;
863  buf >> run;
864  buf >> orbit;
865  if(buf.bad()){
866  throw cms::Exception("Matacq")
867  << "Syntax error in Orbit offset file '"
868  << orbitOffsetFile_ << "'";
869  }
870  cout << run << "\t" << orbit << "\n";
871  orbitOffset_.insert(pair<int, int>(run, orbit));
872  }
873 }
874 
875 #ifdef USE_STORAGE_MANAGER
876 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess){
877  if(0==inFile_.get()) return false;
878  try{
880  if(whence==SEEK_SET) wh = Storage::SET;
881  else if(whence==SEEK_CUR) wh = Storage::CURRENT;
882  else if(whence==SEEK_END) wh = Storage::END;
883  else throw cms::Exception("Bug") << "Bug found in "
884  << __FILE__ << ": "<< __LINE__ << "\n";
885 
886  inFile_->position(offset, wh);
887  } catch(cms::Exception& e){
888  if(verbosity_){
889  cout << "[Matacq " << now() << "] ";
890  if(mess) cout << mess << ". ";
891  cout << "Random access error on input matacq file. ";
892  if(whence==SEEK_SET) cout << "Failed to seek absolute position " << offset;
893  else if(whence==SEEK_CUR) cout << "Failed to move " << offset << " bytes forward";
894  else if(whence==SEEK_END) cout << "Failed to seek position at " << offset << " bytes before end of file";
895  cout << ". Reopening file. " << e.what() << "\n";
897  return false;
898  }
899  }
900  return true;
901 }
902 
904  if(0==inFile_.get()) return false;
905  pos = inFile_->position();
906  return true;
907 }
908 
909 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
910  if(0==inFile_.get()) return false;
911 
912  filepos_t pos = -1;
913  if(!mtell(pos)) return false;
914 
915  bool rc = false;
916  try{
917  rc = (n==inFile_->xread(buf, n));
918  } catch(cms::Exception& e){
919  if(verbosity_){
920  cout << "[Matacq " << now() << "] ";
921  if(mess) cout << mess << ". ";
922  cout << "Read failure from input matacq file: "
923  << e.what() << "\n";
924  }
925  //recovering from error:
927  mseek(pos);
928  return false;
929  }
930  if(peek){//asked to restore original file position
931  mseek(pos);
932  }
933  return rc;
934 }
935 
937  if(inFile_.get()==0) return false;
938  s = inFile_.get()->size();
939  return true;
940 }
941 
943  Storage* file = inFile_.get();
944  if(file==0) return false;
945  try{
946  file->rewind();
947  } catch(cms::Exception e){
948  if(verbosity_) cout << "Exception cautgh while rewinding file "
949  << inFileName_ << ": " << e.what() << ". "
950  << "File will be reopened.";
951  return mopen(inFileName_);
952  }
953  return true;
954 }
955 
957  return StorageFactory::get()->check(name);
958 }
959 
961  //close already opened file if any:
962  mclose();
963 
964  try{
965  inFile_
966  = unique_ptr<Storage>(StorageFactory::get()->open(name,
968  inFileName_ = name;
969  } catch(cms::Exception& e){
970  LogWarning("Matacq") << e.what();
971  inFile_.reset();
972  inFileName_ = "";
973  return false;
974  }
975  return true;
976 }
977 
979  if(inFile_.get()!=0){
980  inFile_->close();
981  inFile_.reset();
982  }
983 }
984 
986  return inFile_.get()!=0;
987 }
988 
989 bool MatacqProducer::meof(){
990  if(inFile_.get()==0) return true;
991  return inFile_->eof();
992 }
993 
994 #else //USE_STORAGE_MANAGER not defined
995 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess){
996  if(nullptr==inFile_) return false;
997  const int rc = fseeko(inFile_, offset, whence);
998  if(rc!=0 && verbosity_){
999  cout << "[Matacq " << now() << "] ";
1000  if(mess) cout << mess << ". ";
1001  cout << "Random access error on input matacq file. "
1002  "Rewind file.\n";
1003  mrewind();
1004  }
1005  return rc==0;
1006 }
1007 
1009  if(nullptr==inFile_) return false;
1010  pos = ftello(inFile_);
1011  return pos != -1;
1012 
1013 }
1014 
1015 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek){
1016  if(nullptr==inFile_) return false;
1017  off_t pos = ftello(inFile_);
1018  bool rc = (pos!=-1) && (1==fread(buf, n, 1, inFile_));
1019  if(!rc){
1020  if(verbosity_){
1021  cout << "[Matacq " << now() << "] ";
1022  if(mess) cout << mess << ". ";
1023  cout << "Read failure from input matacq file.\n";
1024  }
1025  clearerr(inFile_);
1026  }
1027  if(peek || !rc){//need to restore file position
1028  if(0!=fseeko(inFile_, pos, SEEK_SET)){
1029  if(verbosity_){
1030  cout << "[Matacq " << now() << "] ";
1031  if(mess) cout << mess << ". ";
1032  cout << "Failed to restore file position of "
1033  "before read error. Rewind file.\n";
1034  }
1035  //rewind(inFile_.get());
1036  mrewind();
1037  lastOrb_ = 0;
1038  }
1039  }
1040  return rc;
1041 }
1042 
1044  if(nullptr==inFile_) return false;
1045  struct stat buf;
1046  if(0!=fstat(fileno(inFile_), &buf)){
1047  s = 0;
1048  return false;
1049  } else{
1050  s = buf.st_size;
1051  return true;
1052  }
1053 }
1054 
1056  if(nullptr==inFile_) return false;
1057  clearerr(inFile_);
1058  return fseeko(inFile_, 0, SEEK_SET)!=0;
1059 }
1060 
1062  struct stat dummy;
1063  return 0==stat(name.c_str(), &dummy);
1064 // if(stat(name.c_str(), &dummy)==0){
1065 // return true;
1066 // } else{
1067 // cout << "[Matacq " << now() << "] Failed to stat file '"
1068 // << name.c_str() << "'. "
1069 // << "Error " << errno << ": " << strerror(errno) << "\n";
1070 // return false;
1071 // }
1072 }
1073 
1075  if(inFile_!=nullptr) mclose();
1076  inFile_ = fopen(name.c_str(), "r");
1077  if(inFile_!=nullptr){
1078  inFileName_ = name;
1079  return true;
1080  } else{
1081  inFileName_ = "";
1082  return false;
1083  }
1084 }
1085 
1087  if(inFile_!=nullptr) fclose(inFile_);
1088  inFile_ = nullptr;
1089 }
1090 
1092  return inFile_!=nullptr;
1093 }
1094 
1096  if(nullptr==inFile_) return true;
1097  return feof(inFile_)==0;
1098 }
1099 
1100 #endif //USE_STORAGE_MANAGER defined
1101 
1103  int millions = runNumber / (1000*1000);
1104  int thousands = (runNumber-millions*1000*1000) / 1000;
1105  int units = runNumber-millions*1000*1000 - thousands*1000;
1106  return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
1107 }
1108 
1109 void MatacqProducer::newRun(int prevRun, int newRun){
1110  runNumber_ = newRun;
1111  eventSkipCounter_ = 0;
1112  logFile_ << "[" << now() << "] Event count for run "
1113  << runNumber_ << ": "
1114  << "total: " << stats_.nEvents << ", "
1115  << "Laser event with Matacq data: "
1116  << stats_.nLaserEventsWithMatacq << ", "
1117  << "Non laser event (according to DCC header) with Matacq data: "
1118  << stats_.nNonLaserEventsWithMatacq << "\n" << flush;
1119 
1120  stats_.nEvents = 0;
1123 
1124 
1125 }
1126 
1127 bool MatacqProducer::getOrbitRange(uint32_t& firstOrb, uint32_t& lastOrb){
1128  filepos_t pos = -1;
1129  filepos_t fsize = -1;
1130  mtell(pos);
1131  msize(fsize);
1132  const unsigned headerSize = 8*8;
1133  unsigned char header[headerSize];
1134  //FIXME: Don't we need here to rewind?
1135  mseek(0);
1136  if(!mread((char*)header, headerSize, nullptr, false)) return false;
1137  firstOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1138  int len = (int)MatacqRawEvent::getDccLen(header, headerSize);
1139  //number of complete events. If last event is partially written,
1140  //it won't be included in the count.
1141  unsigned nEvts = fsize / (len*64);
1142  //Position of last complete event:
1143  filepos_t lastEvtPos = (nEvts - 1) * len * 64;
1144  mseek(lastEvtPos);
1145  mread((char*)header, headerSize, nullptr, false);
1146  lastOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1147 
1148  //restore file position:
1149  mseek(pos);
1150 
1151  return true;
1152 }
static const char runNumber_[]
bool check(const std::string &url, IOOffset *size=0) const
std::string inFileName_
T getParameter(std::string const &) const
~MatacqProducer() override
std::map< uint32_t, uint32_t > orbitOffset_
uint32_t runNumber_
std::ofstream logFile_
static const int matacqFedId_
static const int orbitTolerance_
std::ofstream timeLog_
Definition: CLHEP.h:16
def copy(args, dbName)
unsigned getRunNum() const
bool getByToken(EDGetToken token, Handle< PROD > &result) const
Definition: Event.h:517
#define nullptr
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
std::vector< std::string > fileNames_
bool ev
char const * what() const override
Definition: Exception.cc:103
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
size_t size() const
Lenght of the data buffer in bytes.
Definition: FEDRawData.h:47
uint32_t getRunNumber(edm::Event &ev) const
uint32_t getOrbitId() const
unsigned getDccLen() const
uint32_t getOrbitId(edm::Event &ev) const
Relative
Definition: Storage.h:23
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
void add(const T &value)
Definition: Majority.h:23
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
int64_t pos(int orb) const
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
static std::string runSubDir(uint32_t runNumber)
RunNumber_t run() const
Definition: Event.h:101
bool mread(char *buf, size_t n, const char *mess=0, bool peek=false)
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
double f[11][100]
format
Some error handling for the usage.
bool isValid() const
Definition: HandleBase.h:74
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
int getCalibTriggerType(edm::Event &ev) const
const int nEvts
std::string timeLogFile_
unsigned long long uint64_t
Definition: Time.h:15
MatacqDataFormatter formatter_
static const uint16_t invalid_
Definition: Constants.h:16
uint32_t openedFileRunNumber_
PosEstimator posEstim_
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
string fname
main script
static const int bufferSize
static bool inRange(int)
MatacqProducer(const edm::ParameterSet &params)
edm::EventID id() const
Definition: EventBase.h:59
TString units(TString variable, Char_t axis)
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
static std::string now()
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=0)
MatacqRawEvent matacq_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
std::string rawInstanceName_
void interpretRawData(const FEDRawData &data, EcalMatacqDigiCollection &matacqDigiCollection)
T result(double *proba) const
Definition: Majority.h:28
std::string logFileName_
UInt_t nEvents
Definition: hcalCalib.cc:41
bool msize(filepos_t &s)
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
virtual void rewind(void)
Definition: Storage.cc:114
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=0)
static const stats_t stats_init
#define str(s)
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1
bool mopen(const std::string &name)
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)