CMS 3D CMS Logo

MatacqProducer.cc
Go to the documentation of this file.
12 #include <boost/algorithm/string.hpp>
13 #include <boost/format.hpp>
14 
15 #include <iostream>
16 #include <memory>
17 
18 #include <cstdio>
19 
20 #include <fstream>
21 #include <iomanip>
22 
24 
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <csignal>
28 #include <sys/stat.h>
29 #include <glob.h>
30 
31 using namespace std;
32 using namespace boost;
33 using namespace edm;
34 
35 // #undef LogInfo
36 // #define LogInfo(a) cout << "INFO " << a << ": "
37 // #undef LogWarning
38 // #define LogWarning(a) cout << "WARN " << a << ": "
39 // #undef LogDebug
40 // #define LogDebug(a) cout << "DBG " << a << ": "
41 
42 //verbose mode for matacq event retrieval debugging:
43 //static const bool searchDbg = false;
44 
45 //laser freq is 1 every 112 orbit => >80 orbit
46 const int MatacqProducer::orbitTolerance_ = 80;
47 
49 
50 static std::string now() {
51  struct timeval t;
52  gettimeofday(&t, nullptr);
53 
54  char buf[256];
55  strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
56  buf[sizeof(buf) - 1] = 0;
57 
58  stringstream buf2;
59  buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
60 
61  return buf2.str();
62 }
63 
65  : fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
66  digiInstanceName_(params.getParameter<string>("digiInstanceName")),
67  rawInstanceName_(params.getParameter<string>("rawInstanceName")),
68  timing_(params.getUntrackedParameter<bool>("timing", false)),
69  disabled_(params.getParameter<bool>("disabled")),
70  verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
71  produceDigis_(params.getParameter<bool>("produceDigis")),
72  produceRaw_(params.getParameter<bool>("produceRaw")),
73  inputRawCollection_(params.getParameter<edm::InputTag>("inputRawCollection")),
74  mergeRaw_(params.getParameter<bool>("mergeRaw")),
75  ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
76  matacq_(nullptr, 0),
77  inFile_(nullptr),
78  data_(bufferSize),
79  openedFileRunNumber_(0),
80  lastOrb_(0),
81  fastRetrievalThresh_(0),
82  orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile", "")),
83  inFileName_(""),
84  stats_(stats_init),
85  logFileName_(params.getUntrackedParameter<std::string>("logFileName", "matacqProducer.log")),
86  eventSkipCounter_(0),
87  onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
88  timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
89  runNumber_(0) {
90  if (verbosity_ >= 4)
91  cout << "[Matacq " << now() << "] in MatacqProducer ctor" << endl;
92 
93  gettimeofday(&timer_, nullptr);
94 
95  if (!timeLogFile_.empty()) {
96  timeLog_.open(timeLogFile_.c_str());
97  if (timeLog_.fail()) {
98  cout << "[LaserSorter " << now() << "] "
99  << "Failed to open file " << timeLogFile_ << " to log timing.\n";
100  logTiming_ = false;
101  } else {
102  logTiming_ = true;
103  }
104  }
105 
107 
108  logFile_.open(logFileName_.c_str(), ios::app | ios::out);
109 
110  if (logFile_.bad()) {
111  throw cms::Exception("FileOpen") << "Failed to open file " << logFileName_ << " for logging.\n";
112  }
113 
114  inputRawCollectionToken_ = consumes<FEDRawDataCollection>(params.getParameter<InputTag>("inputRawCollection"));
115 
116  if (produceDigis_) {
117  if (verbosity_ > 0)
118  cout << "[Matacq " << now()
119  << "] registering new "
120  "EcalMatacqDigiCollection product with instance name '"
121  << digiInstanceName_ << "'\n";
122  produces<EcalMatacqDigiCollection>(digiInstanceName_);
123  }
124 
125  if (produceRaw_) {
126  if (verbosity_ > 0)
127  cout << "[Matacq " << now()
128  << "] registering new FEDRawDataCollection "
129  "product with instance name '"
130  << rawInstanceName_ << "'\n";
131  produces<FEDRawDataCollection>(rawInstanceName_);
132  }
133 
134  startTime_.tv_sec = startTime_.tv_usec = 0;
135  if (!orbitOffsetFile_.empty()) {
136  doOrbitOffset_ = true;
137  loadOrbitOffset();
138  } else {
139  doOrbitOffset_ = false;
140  }
141  if (verbosity_ >= 4)
142  cout << "[Matacq " << now() << "] exiting MatacqProducer ctor" << endl;
143 }
144 
146  if (verbosity_ >= 4)
147  cout << "[Matacq " << now() << "] in MatacqProducer::produce" << endl;
148  if (logTiming_) {
149  timeval t;
150  gettimeofday(&t, nullptr);
151 
152  timeLog_ << t.tv_sec << "." << setfill('0') << setw(3) << (t.tv_usec + 500) / 1000 << setfill(' ') << "\t"
153  << (t.tv_usec - timer_.tv_usec) * 1. + (t.tv_sec - timer_.tv_sec) * 1.e6 << "\t";
154  timer_ = t;
155  }
156 
157  if (startTime_.tv_sec == 0)
158  gettimeofday(&startTime_, nullptr);
159  ++stats_.nEvents;
160  if (disabled_)
161  return;
162  const uint32_t runNumber = getRunNumber(event);
163  if (runNumber != runNumber_) {
164  newRun(runNumber_, runNumber);
165  }
166  addMatacqData(event);
167 
168  if (logTiming_) {
169  timeval t;
170  gettimeofday(&t, nullptr);
171  timeLog_ << (t.tv_usec - timer_.tv_usec) * 1. + (t.tv_sec - timer_.tv_sec) * 1.e6 << "\n";
172  timer_ = t;
173  }
174 }
175 
178  event.getByToken(inputRawCollectionToken_, sourceColl);
179 
180  std::unique_ptr<FEDRawDataCollection> rawColl;
181  if (produceRaw_) {
182  if (mergeRaw_) {
183  rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
184  } else {
185  rawColl = std::make_unique<FEDRawDataCollection>();
186  }
187  }
188 
189  auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
190 
191  if (eventSkipCounter_ == 0) {
192  if (sourceColl->FEDData(matacqFedId_).size() > 4 && !produceRaw_) {
193  //input raw data collection already contains matacqData
195  } else {
196  bool isLaserEvent = (getCalibTriggerType(event) == laserType);
197 
198  // cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
199 
200  if (isLaserEvent || ignoreTriggerType_) {
201  const uint32_t runNumber = getRunNumber(event);
202  const uint32_t orbitId = getOrbitId(event);
203 
204  LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
205 
206  bool fileChange;
207  if (doOrbitOffset_) {
208  map<uint32_t, uint32_t>::iterator it = orbitOffset_.find(runNumber);
209  if (it == orbitOffset_.end()) {
210  LogWarning("Matacq") << "Orbit offset not found for run " << runNumber
211  << ". No orbit correction will be applied.";
212  }
213  }
214 
215  if (getMatacqFile(runNumber, orbitId, &fileChange)) {
216  //matacq file retrieval succeeded
217  LogInfo("Matacq") << "Matacq data file found for "
218  << "run " << runNumber << " orbit " << orbitId;
219  if (getMatacqEvent(runNumber, orbitId, fileChange)) {
220  if (produceDigis_) {
221  formatter_.interpretRawData(matacq_, *digiColl);
222  }
223  if (produceRaw_) {
224  uint32_t dataLen64 = matacq_.getParsedLen();
225  if (dataLen64 > bufferSize * 8 || matacq_.getDccLen() != dataLen64) {
226  LogWarning("Matacq") << " Error in Matacq event fragment length! "
227  << "DCC len: " << matacq_.getDccLen()
228  << "*8 Bytes, Parsed len: " << matacq_.getParsedLen() << "*8 Bytes. "
229  << "Matacq data will not be included for this event.\n";
230  } else {
231  rawColl->FEDData(matacqFedId_).resize(dataLen64 * 8);
232  copy(data_.begin(), data_.begin() + dataLen64 * 8, rawColl->FEDData(matacqFedId_).data());
233  }
234  }
235  LogInfo("Matacq") << "Associating matacq data with orbit id " << matacq_.getOrbitId()
236  << " to dcc event with orbit id " << orbitId << std::endl;
237  if (isLaserEvent) {
239  } else {
241  }
242  } else {
243  if (isLaserEvent) {
244  LogWarning("Matacq") << "No matacq data found for laser event "
245  << "of run " << runNumber << " orbit " << orbitId;
246  }
247  }
248  } else {
249  LogWarning("Matacq") << "No matacq file found for event " << event.id();
250  }
251  }
252  }
253  if (eventSkipCounter_ > 0) { //error occured for this events
254  // and some events will be skipped following
255  // to this error.
256  LogInfo("Matacq") << " [" << now() << "] " << eventSkipCounter_
257  << " next events will be skipped, following to an "
258  << "error on the last processed event, "
259  << "which is expected to be persistant.";
260  }
261  } else {
263  }
264 
265  if (produceRaw_) {
266  if (verbosity_ > 1)
267  cout << "[Matacq " << now() << "] "
268  << "Adding FEDRawDataCollection collection "
269  << " to event.\n";
270  event.put(std::move(rawColl), rawInstanceName_);
271  }
272 
273  if (produceDigis_) {
274  if (verbosity_ > 1)
275  cout << "[Matacq " << now() << "] "
276  << "Adding EcalMatacqDigiCollection collection "
277  << " to event.\n";
278  event.put(std::move(digiColl), digiInstanceName_);
279  }
280 }
281 
282 // #if 0
283 // bool
284 // MatacqProducer::getMatacqEvent(std::ifstream& f,
285 // uint32_t runNumber,
286 // uint32_t orbitId,
287 // bool doWrap,
288 // std::streamoff maxPos){
289 // bool found = false;
290 // streampos startPos = f.tellg();
291 
292 // while(!f.eof()
293 // && !found
294 // && (maxPos<0 || f.tellg()<=maxPos)){
295 // const streamsize headerSize = 8*8;
296 // f.read((char*)&data_[0], headerSize);
297 // if(f.eof()) break;
298 // int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
299 // uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
300 // uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
301 // // cout << "Matacq: orbit = " << orb
302 // // << " len = " << len
303 // // << " run = " << run << endl;
304 // if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
305 // && (runNumber==0 || runNumber==run)){
306 // found = true;
307 // //reads the rest of the event:
308 // if(data_.size() < len*8){
309 // throw cms::Exception("Matacq") << "Buffer overflow";
310 // }
311 // f.read((char*)&data_[0]+headerSize, len*8-headerSize);
312 // matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
313 // } else{
314 // //moves to next event:
315 // f.seekg(len*8 - headerSize, ios::cur);
316 // }
317 // }
318 
319 // f.clear(); //clears eof error to allow seekg
320 // if(doWrap && !found){
321 // f.seekg(0, ios::beg);
322 // found = getMatacqEvent(f, runNumber, orbitId, false, startPos);
323 // }
324 // return found;
325 // }
326 //#endif
327 
328 bool MatacqProducer::getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange) {
329  filepos_t startPos;
330  if (!mtell(startPos))
331  return false;
332 
333  int32_t startOrb = -1;
334  const size_t headerSize = 8 * 8;
335  if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
336  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
337  if (startOrb < 0)
338  startOrb = 0;
339  } else {
340  if (verbosity_ > 2) {
341  cout << "[Matacq " << now()
342  << "] Failed to read matacq header. Moved to start of "
343  " the file.\n";
344  }
345  mrewind();
346  if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
347  startPos = 0;
348  startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
349  } else {
350  if (verbosity_ > 2)
351  cout << "[Matacq " << now() << "] Looks like matacq file is empty"
352  << "\n";
353  return false;
354  }
355  }
356 
357  if (verbosity_ > 2)
358  cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_ << " looking for orbit " << orbitId
359  << ". Current file position: " << startPos << " Orbit at current position: " << startOrb << "\n";
360 
361  // f.clear();
362  bool didCoarseMove = false;
363 
364  //FIXME: case where posEtim_.invalid() is false
365  if (!posEstim_.invalid() && (abs(lastOrb_ - orbitId) > fastRetrievalThresh_)) {
366  filepos_t pos = posEstim_.pos(orbitId);
367 
368  // struct stat st;
369  filepos_t fsize;
370  // if(0==stat(inFileName_.c_str(), &st)){
371  if (msize(fsize)) {
372  // const int64_t fsize = st.st_size;
373  if (0 != posEstim_.eventLength() && pos > fsize) {
374  //estimated position is beyong end of file
375  //-> move to beginning of last event:
376  int64_t evtSize = posEstim_.eventLength() * sizeof(uint64_t);
377  pos = ((int64_t)fsize / evtSize - 1) * evtSize;
378  if (verbosity_ > 2) {
379  cout << "[Matacq " << now()
380  << "] Estimated position was beyond end of file. "
381  "Changed to "
382  << pos << "\n";
383  }
384  }
385  } else {
386  LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
387  }
388  if (pos >= 0) {
389  if (verbosity_ > 2)
390  cout << "[Matacq " << now() << "] jumping to estimated position " << pos << "\n";
391  mseek(pos, SEEK_SET, "Jumping to estimated event position");
392  if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
393  didCoarseMove = true;
394  } else {
395  //estimated position might have been beyond the end of the file,
396  //try, with original position:
397  didCoarseMove = false;
398  if (!mread((char*)&data_[0], headerSize, "Reading event header", true)) {
399  return false;
400  }
401  }
402  } else {
403  if (verbosity_)
404  cout << "[Matacq " << now()
405  << "] Event orbit outside of orbit range "
406  "of matacq data file events\n";
407  return false;
408  }
409  }
410 
411  int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
412 
413  if (didCoarseMove) {
414  //autoadjustement of threshold for coarse move:
415  if (abs(orb - orbitId) > fastRetrievalThresh_) {
416  if (verbosity_ > 2)
417  cout << "[Matacq " << now() << "] Fast retrieval threshold increased from " << fastRetrievalThresh_;
418  fastRetrievalThresh_ = 2 * abs(orb - orbitId);
419  if (verbosity_ > 2)
420  cout << " to " << fastRetrievalThresh_ << "\n";
421  }
422 
423  //if coarse move did not improve situation, rolls back:
424  if (startOrb > 0 && (abs(orb - orbitId) > abs(startOrb - orbitId))) {
425  if (verbosity_ > 2)
426  cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb
427  << ") "
428  "was worst than original position (-> orbit "
429  << startOrb << "). Restoring position (" << startPos << ").\n";
430  mseek(startPos, SEEK_SET);
431  mread((char*)&data_[0], headerSize, "Reading event header", true);
432  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
433  }
434  }
435 
436  bool searchBackward = (orb > orbitId) ? true : false;
437  //BEWARE: len must be signed, because we are using latter in the code (-len)
438  //expression
439  int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
440 
441  if (len == 0) {
442  cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
443  << " and move matacq file pointer to beginning of the file. "
444  << "(" << __FILE__ << ":" << __LINE__ << ")."
445  << "\n";
446  //rewind(f);
447  mrewind();
448  return false;
449  }
450 
451  enum state_t { searching, found, failed } state = searching;
452 
453  while (state == searching) {
454  orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
455  len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
456  uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
457  if (verbosity_ > 3) {
458  filepos_t pos = -1;
459  mtell(pos);
460  cout << "[Matacq " << now() << "] Header read at file position " << pos << ": orbit = " << orb
461  << " len = " << len << "x8 Byte"
462  << " run = " << run << "\n";
463  }
464  if ((abs(orb - orbitId) < orbitTolerance_) && (runNumber == 0 || runNumber == run)) {
465  state = found;
466  lastOrb_ = orb;
467  //reads the rest of the event:
468  if ((int)data_.size() < len * 8) {
469  throw cms::Exception("Matacq") << "Buffer overflow";
470  }
471  if (verbosity_ > 2)
472  cout << "[Matacq " << now()
473  << "] Event found. Reading "
474  " matacq event."
475  << "\n";
476  if (!mread((char*)&data_[0], len * 8, "Reading matacq event")) {
477  if (verbosity_ > 2)
478  cout << "[Matacq " << now() << "] Failed to read matacq event."
479  << "\n";
480  state = failed;
481  }
482  matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len * 8);
483  } else {
484  if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) { //search ended
485  lastOrb_ = orb;
486  state = failed;
487  if (verbosity_ > 2)
488  cout << "[Matacq " << now() << "] No matacq data found for run " << run << ", orbit ID " << orbitId << "."
489  << "\n";
490  } else {
491  off_t offset = (searchBackward ? -len : len) * 8;
492  lastOrb_ = orb;
493  if (verbosity_ > 3) {
494  cout << "[Matacq " << now() << "] In matacq file, moving " << abs(offset) << " byte "
495  << (offset > 0 ? "forward" : "backward") << ".\n";
496  }
497 
498  if (mseek(offset, SEEK_CUR, (searchBackward ? "Moving to previous event" : "Moving to next event")) &&
499  mread((char*)&data_[0], headerSize, "Reading event header", true)) {
500  } else {
501  if (!searchBackward)
502  mseek(-len * 8, SEEK_CUR, "Moving to start of last complete event");
503  state = failed;
504  }
505  }
506  }
507  }
508 
509  if (state == found) {
510  filepos_t pos = -1;
511  filepos_t fsize = -1;
512  mtell(pos);
513  msize(fsize);
514  if (pos == fsize - 1) { //last byte.
515  if (verbosity_ > 2) {
516  cout << "[Matacq " << now()
517  << "] Event found was at the end of the file. Moving "
518  "stream position to beginning of this event."
519  << "\n";
520  }
521  mseek(-(int)len * 8 - 1, SEEK_CUR, "Moving to beginning of last matacq event");
522  }
523  }
524  return (state == found);
525 }
526 
527 bool MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool* fileChange) {
528  if (openedFileRunNumber_ != 0 && openedFileRunNumber_ == runNumber) {
529  uint32_t firstOrb, lastOrb;
530  bool goodRange = getOrbitRange(firstOrb, lastOrb);
531  // if(orbitId < firstOrb || orbitId > lastOrb) continue;
532  if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
533  if (fileChange != nullptr)
534  *fileChange = false;
535  return misOpened();
536  }
537  }
538 
539  if (fileNames_.empty())
540  return false;
541 
542  const string runNumberFormat = "%08d{,_*}";
543  string sRunNumber = str(boost::format(runNumberFormat) % runNumber);
544  //cout << "Run number string: " << sRunNumber << "\n";
545  bool found = false;
546  string fname;
547  uint32_t maxOrb = 0;
548  //we make two iterations to handle the case where the event is procesed
549  //before the matacq data are available. In such case we would have
550  //orbitId > maxOrb (maxOrb: orbit of last written matacq event)
551  for (int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry) {
552  if (itry > 0) {
553  int n_sec = 1;
554  std::cout << "[Matacq " << now() << "] Event orbit id (" << orbitId
555  << ") goes "
556  "beyound the range of available one. Waiting for "
557  << n_sec
558  << " seconds in case "
559  "it was not written yet to disk.";
560  sleep(n_sec);
561  }
562 
563  for (unsigned i = 0; i < fileNames_.size() && !found; ++i) {
564  fname = fileNames_[i];
565  boost::algorithm::replace_all(fname, "%run_subdir%", runSubDir(runNumber));
566  boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
567 
568  glob_t g;
569  int rc = glob(fname.c_str(), GLOB_BRACE, nullptr, &g);
570  if (rc) {
571  if (verbosity_ > 1) {
572  switch (rc) {
573  case GLOB_NOSPACE:
574  std::cout << "[Matacq " << now()
575  << "] Running out of memory while calling glob function to look for matacq file paths\n";
576  break;
577  case GLOB_ABORTED:
578  std::cout << "[Matacq " << now()
579  << "] Read error while calling glob function to look for matacq file paths\n";
580  break;
581  case GLOB_NOMATCH:
582  //ok. No message to report.
583  break;
584  }
585  continue;
586  }
587  } //rc
588  for (unsigned iglob = 0; iglob < g.gl_pathc; ++iglob) {
589  char* thePath = g.gl_pathv[iglob];
590  //FIXME: add sanity check on the path
591  static std::atomic<int> nOpenErrors{0};
592  const int maxOpenErrors = 50;
593  if (!mopen(thePath) && nOpenErrors < maxOpenErrors) {
594  std::cout << "[Matacq " << now() << "] Failed to open file " << thePath;
595  ++nOpenErrors;
596  if (nOpenErrors == maxOpenErrors) {
597  std::cout << nOpenErrors << "This is the " << maxOpenErrors
598  << "th occurence of this error. Report of this error is now disabled.\n";
599  } else {
600  std::cout << "\n";
601  }
602  }
603  uint32_t firstOrb;
604  uint32_t lastOrb;
605  bool goodRange = getOrbitRange(firstOrb, lastOrb);
606  if (goodRange && lastOrb > maxOrb)
607  maxOrb = lastOrb;
608  if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
609  found = true;
610  //continue;
611  fname = thePath;
612  if (verbosity_ > 1)
613  std::cout << "[Matacq " << now() << "] Switching to file " << fname << "\n";
614  break;
615  }
616  } //next iglob
617  globfree(&g);
618  } //next filenames
619  } //next itry
620 
621  if (found) {
622  LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
623  } else {
624  if (verbosity_ >= 0)
625  cout << "[Matacq " << now()
626  << "] no matacq file found "
627  "for run "
628  << runNumber << "\n";
631  if (fileChange != nullptr)
632  *fileChange = false;
633  return false;
634  }
635 
636  if (found) {
638  lastOrb_ = 0;
639  posEstim_.init(this);
640  if (fileChange != nullptr)
641  *fileChange = true;
642  return true;
643  } else {
644  return false;
645  }
646 }
647 
648 uint32_t MatacqProducer::getRunNumber(edm::Event& ev) const { return ev.run(); }
649 
651  //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
652  //we could use here. The code would be shorten to:
653  //return ev.orbitNumber();
654  //we have to deal with what we have in current CMSSW releases:
657  if (!(rawdata.isValid())) {
658  throw cms::Exception("NotFound") << "No FED raw data collection found. ECAL raw data are "
659  "required to retrieve the orbit ID";
660  }
661 
662  int orbit = 0;
663  for (int id = 601; id <= 654; ++id) {
664  if (!FEDNumbering::inRange(id))
665  continue;
666  const FEDRawData& data = rawdata->FEDData(id);
667  const int orbitIdOffset64 = 3;
668  if (data.size() >= 8 * (orbitIdOffset64 + 1)) { //orbit id is in 4th 64-bit word
669  const unsigned char* pOrbit = data.data() + orbitIdOffset64 * 8;
670  int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
671  if (orbit != 0 && thisOrbit != 0 && abs(orbit - thisOrbit) > orbitTolerance_) {
672  //throw cms::Exception("EventCorruption")
673  // << "Orbit ID inconsitency in DCC headers";
674  LogWarning("EventCorruption") << "Orbit ID inconsitency in DCC headers";
675  orbit = 0;
676  break;
677  }
678  if (thisOrbit != 0)
679  orbit = thisOrbit;
680  }
681  }
682 
683  if (orbit == 0) {
684  // throw cms::Exception("NotFound")
685  // << "Failed to retrieve orbit ID of event "<< ev.id();
686  LogWarning("NotFound") << "Failed to retrieve orbit ID of event " << ev.id();
687  }
688  return orbit;
689 }
690 
694  if (!(rawdata.isValid())) {
695  throw cms::Exception("NotFound") << "No FED raw data collection found. ECAL raw data are "
696  "required to retrieve the trigger type";
697  }
698 
700  for (int id = 601; id <= 654; ++id) {
701  if (!FEDNumbering::inRange(id))
702  continue;
703  const FEDRawData& data = rawdata->FEDData(id);
704  const int detailedTrigger32 = 5;
705  if (data.size() >= 4 * (detailedTrigger32 + 1)) {
706  const unsigned char* pTType = data.data() + detailedTrigger32 * 4;
707  int tType = pTType[1] & 0x7;
708  stat.add(tType);
709  }
710  }
711  double p;
712  int tType = stat.result(&p);
713  if (p < 0) {
714  //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
715  LogWarning("NotFound") << "No ECAL DCC data found\n";
716  tType = -1;
717  }
718  if (p < .8) {
719  //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
720  LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
721  tType = -1;
722  }
723  return tType;
724 }
725 
727  mp->mrewind();
728 
729  const size_t headerSize = 8 * 8;
730  unsigned char data[headerSize];
731  if (!mp->mread((char*)data, headerSize)) {
732  if (verbosity_)
733  cout << "[Matacq " << now() << "] reached end of file!\n";
734  firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
735  return;
736  } else {
737  firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
738  eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
739  if (verbosity_ > 1)
740  cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_ << " event length: " << eventLength_
741  << "*8 byte\n";
742  }
743 
744  mp->mrewind();
745 
746  if (eventLength_ == 0) {
747  if (verbosity_)
748  cout << "[Matacq " << now() << "] event length is null!" << endl;
749  return;
750  }
751 
752  filepos_t s;
753  mp->msize(s);
754 
755  //number of complete events:
756  const unsigned nEvents = s / eventLength_ / 8;
757 
758  if (nEvents == 0) {
759  if (verbosity_)
760  cout << "[Matacq " << now() << "] File is empty!" << endl;
761  orbitStepMean_ = 0;
762  return;
763  }
764 
765  if (verbosity_ > 1)
766  cout << "[Matacq " << now() << "] File size: " << s << " Number of events: " << nEvents << endl;
767 
768  //position of last complete events:
769  off_t last = (nEvents - 1) * (off_t)eventLength_ * 8;
770  mp->mseek(last,
771  SEEK_SET,
772  "Moving to beginning of last complete "
773  "matacq event");
774  if (!mp->mread((char*)data, headerSize, "Reading matacq header", true)) {
775  LogWarning("Matacq") << "Fast matacq event retrieval failure. "
776  "Falling back to safe retrieval mode.";
777  orbitStepMean_ = 0;
778  }
779 
780  int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
781  int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
782 
783  if (verbosity_ > 1)
784  cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb << " last event length: " << lastLen << endl;
785 
786  //some consistency check
787  if (lastLen != eventLength_) {
788  LogWarning("Matacq")
789  //throw cms::Exception("Matacq")
790  << "Fast matacq event retrieval failure: it looks like "
791  "the matacq file contains events of different sizes.";
792  // " Falling back to safe retrieval mode.";
793  invalid_ = false; //true;
794  orbitStepMean_ = 112; //0;
795  return;
796  }
797 
798  orbitStepMean_ = (lastOrb - firstOrbit_) / nEvents;
799 
800  if (verbosity_ > 1)
801  cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_ << "\n";
802 
803  invalid_ = false;
804 }
805 
806 int64_t MatacqProducer::PosEstimator::pos(int orb) const {
807  if (orb < firstOrbit_)
808  return -1;
809  uint64_t r = orbitStepMean_ != 0 ? (((uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
810  if (verbosity_ > 2)
811  cout << "[Matacq " << now() << "] Estimated Position for orbit " << orb << ": " << r << endl;
812  return r;
813 }
814 
816  mclose();
817  timeval t;
818  gettimeofday(&t, nullptr);
819  if (logTiming_ && startTime_.tv_sec != 0) {
820  //not using logger, to allow timing with different logging options
821  cout << "[Matacq " << now()
822  << "] Time elapsed between first event and "
823  "destruction of MatacqProducer: "
824  << ((t.tv_sec - startTime_.tv_sec) * 1. + (t.tv_usec - startTime_.tv_usec) * 1.e-6) << "s\n";
825  }
826 }
827 
829  std::ifstream f(orbitOffsetFile_.c_str());
830  if (f.bad()) {
831  throw cms::Exception("Matacq") << "Failed to open orbit ID correction file '" << orbitOffsetFile_ << "'\n";
832  }
833 
834  cout << "[Matacq " << now() << "] "
835  << "Offset to substract to Matacq events Orbit ID: \n"
836  << "#Run Number\t Offset\n";
837 
838  int iline = 0;
839  string s;
840  stringstream buf;
841  while (f.eof()) {
842  getline(f, s);
843  ++iline;
844  if (s[0] == '#') { //comment
845  //skip line:
846  f.ignore(numeric_limits<streamsize>::max(), '\n');
847  continue;
848  }
849  buf.str("");
850  buf << s;
851  int run;
852  int orbit;
853  buf >> run;
854  buf >> orbit;
855  if (buf.bad()) {
856  throw cms::Exception("Matacq") << "Syntax error in Orbit offset file '" << orbitOffsetFile_ << "'";
857  }
858  cout << run << "\t" << orbit << "\n";
859  orbitOffset_.insert(pair<int, int>(run, orbit));
860  }
861 }
862 
863 #ifdef USE_STORAGE_MANAGER
864 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess) {
865  if (0 == inFile_.get())
866  return false;
867  try {
869  if (whence == SEEK_SET)
870  wh = Storage::SET;
871  else if (whence == SEEK_CUR)
872  wh = Storage::CURRENT;
873  else if (whence == SEEK_END)
874  wh = Storage::END;
875  else
876  throw cms::Exception("Bug") << "Bug found in " << __FILE__ << ": " << __LINE__ << "\n";
877 
878  inFile_->position(offset, wh);
879  } catch (cms::Exception& e) {
880  if (verbosity_) {
881  cout << "[Matacq " << now() << "] ";
882  if (mess)
883  cout << mess << ". ";
884  cout << "Random access error on input matacq file. ";
885  if (whence == SEEK_SET)
886  cout << "Failed to seek absolute position " << offset;
887  else if (whence == SEEK_CUR)
888  cout << "Failed to move " << offset << " bytes forward";
889  else if (whence == SEEK_END)
890  cout << "Failed to seek position at " << offset << " bytes before end of file";
891  cout << ". Reopening file. " << e.what() << "\n";
893  return false;
894  }
895  }
896  return true;
897 }
898 
900  if (0 == inFile_.get())
901  return false;
902  pos = inFile_->position();
903  return true;
904 }
905 
906 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek) {
907  if (0 == inFile_.get())
908  return false;
909 
910  filepos_t pos = -1;
911  if (!mtell(pos))
912  return false;
913 
914  bool rc = false;
915  try {
916  rc = (n == inFile_->xread(buf, n));
917  } catch (cms::Exception& e) {
918  if (verbosity_) {
919  cout << "[Matacq " << now() << "] ";
920  if (mess)
921  cout << mess << ". ";
922  cout << "Read failure from input matacq file: " << e.what() << "\n";
923  }
924  //recovering from error:
926  mseek(pos);
927  return false;
928  }
929  if (peek) { //asked to restore original file position
930  mseek(pos);
931  }
932  return rc;
933 }
934 
936  if (inFile_.get() == 0)
937  return false;
938  s = inFile_.get()->size();
939  return true;
940 }
941 
943  Storage* file = inFile_.get();
944  if (file == 0)
945  return false;
946  try {
947  file->rewind();
948  } catch (cms::Exception e) {
949  if (verbosity_)
950  cout << "Exception cautgh while rewinding file " << inFileName_ << ": " << e.what() << ". "
951  << "File will be reopened.";
952  return mopen(inFileName_);
953  }
954  return true;
955 }
956 
957 bool MatacqProducer::mcheck(const std::string& name) { return StorageFactory::get()->check(name); }
958 
960  //close already opened file if any:
961  mclose();
962 
963  try {
964  inFile_ = unique_ptr<Storage>(StorageFactory::get()->open(name, IOFlags::OpenRead));
965  inFileName_ = name;
966  } catch (cms::Exception& e) {
967  LogWarning("Matacq") << e.what();
968  inFile_.reset();
969  inFileName_ = "";
970  return false;
971  }
972  return true;
973 }
974 
975 void MatacqProducer::mclose() {
976  if (inFile_.get() != 0) {
977  inFile_->close();
978  inFile_.reset();
979  }
980 }
981 
982 bool MatacqProducer::misOpened() { return inFile_.get() != 0; }
983 
984 bool MatacqProducer::meof() {
985  if (inFile_.get() == 0)
986  return true;
987  return inFile_->eof();
988 }
989 
990 #else //USE_STORAGE_MANAGER not defined
991 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess) {
992  if (nullptr == inFile_)
993  return false;
994  const int rc = fseeko(inFile_, offset, whence);
995  if (rc != 0 && verbosity_) {
996  cout << "[Matacq " << now() << "] ";
997  if (mess)
998  cout << mess << ". ";
999  cout << "Random access error on input matacq file. "
1000  "Rewind file.\n";
1001  mrewind();
1002  }
1003  return rc == 0;
1004 }
1005 
1007  if (nullptr == inFile_)
1008  return false;
1009  pos = ftello(inFile_);
1010  return pos != -1;
1011 }
1012 
1013 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek) {
1014  if (nullptr == inFile_)
1015  return false;
1016  off_t pos = ftello(inFile_);
1017  bool rc = (pos != -1) && (1 == fread(buf, n, 1, inFile_));
1018  if (!rc) {
1019  if (verbosity_) {
1020  cout << "[Matacq " << now() << "] ";
1021  if (mess)
1022  cout << mess << ". ";
1023  cout << "Read failure from input matacq file.\n";
1024  }
1025  clearerr(inFile_);
1026  }
1027  if (peek || !rc) { //need to restore file position
1028  if (0 != fseeko(inFile_, pos, SEEK_SET)) {
1029  if (verbosity_) {
1030  cout << "[Matacq " << now() << "] ";
1031  if (mess)
1032  cout << mess << ". ";
1033  cout << "Failed to restore file position of "
1034  "before read error. Rewind file.\n";
1035  }
1036  //rewind(inFile_.get());
1037  mrewind();
1038  lastOrb_ = 0;
1039  }
1040  }
1041  return rc;
1042 }
1043 
1045  if (nullptr == inFile_)
1046  return false;
1047  struct stat buf;
1048  if (0 != fstat(fileno(inFile_), &buf)) {
1049  s = 0;
1050  return false;
1051  } else {
1052  s = buf.st_size;
1053  return true;
1054  }
1055 }
1056 
1058  if (nullptr == inFile_)
1059  return false;
1060  clearerr(inFile_);
1061  return fseeko(inFile_, 0, SEEK_SET) != 0;
1062 }
1063 
1065  struct stat dummy;
1066  return 0 == stat(name.c_str(), &dummy);
1067  // if(stat(name.c_str(), &dummy)==0){
1068  // return true;
1069  // } else{
1070  // cout << "[Matacq " << now() << "] Failed to stat file '"
1071  // << name.c_str() << "'. "
1072  // << "Error " << errno << ": " << strerror(errno) << "\n";
1073  // return false;
1074  // }
1075 }
1076 
1078  if (inFile_ != nullptr)
1079  mclose();
1080  inFile_ = fopen(name.c_str(), "r");
1081  if (inFile_ != nullptr) {
1082  inFileName_ = name;
1083  return true;
1084  } else {
1085  inFileName_ = "";
1086  return false;
1087  }
1088 }
1089 
1091  if (inFile_ != nullptr)
1092  fclose(inFile_);
1093  inFile_ = nullptr;
1094 }
1095 
1096 bool MatacqProducer::misOpened() { return inFile_ != nullptr; }
1097 
1099  if (nullptr == inFile_)
1100  return true;
1101  return feof(inFile_) == 0;
1102 }
1103 
1104 #endif //USE_STORAGE_MANAGER defined
1105 
1107  int millions = runNumber / (1000 * 1000);
1108  int thousands = (runNumber - millions * 1000 * 1000) / 1000;
1109  int units = runNumber - millions * 1000 * 1000 - thousands * 1000;
1110  return str(boost::format("%03d/%03d/%03d") % millions % thousands % units);
1111 }
1112 
1113 void MatacqProducer::newRun(int prevRun, int newRun) {
1114  runNumber_ = newRun;
1115  eventSkipCounter_ = 0;
1116  logFile_ << "[" << now() << "] Event count for run " << runNumber_ << ": "
1117  << "total: " << stats_.nEvents << ", "
1118  << "Laser event with Matacq data: " << stats_.nLaserEventsWithMatacq << ", "
1119  << "Non laser event (according to DCC header) with Matacq data: " << stats_.nNonLaserEventsWithMatacq << "\n"
1120  << flush;
1121 
1122  stats_.nEvents = 0;
1125 }
1126 
1127 bool MatacqProducer::getOrbitRange(uint32_t& firstOrb, uint32_t& lastOrb) {
1128  filepos_t pos = -1;
1129  filepos_t fsize = -1;
1130  mtell(pos);
1131  msize(fsize);
1132  const unsigned headerSize = 8 * 8;
1133  unsigned char header[headerSize];
1134  //FIXME: Don't we need here to rewind?
1135  mseek(0);
1136  if (!mread((char*)header, headerSize, nullptr, false))
1137  return false;
1138  firstOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1139  int len = (int)MatacqRawEvent::getDccLen(header, headerSize);
1140  //number of complete events. If last event is partially written,
1141  //it won't be included in the count.
1142  unsigned nEvts = fsize / (len * 64);
1143  //Position of last complete event:
1144  filepos_t lastEvtPos = (nEvts - 1) * len * 64;
1145  mseek(lastEvtPos);
1146  mread((char*)header, headerSize, nullptr, false);
1147  lastOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1148 
1149  //restore file position:
1150  mseek(pos);
1151 
1152  return true;
1153 }
static const char runNumber_[]
std::string inFileName_
T getParameter(std::string const &) const
~MatacqProducer() override
uint32_t runNumber_
std::ofstream logFile_
static const int matacqFedId_
static const int orbitTolerance_
std::ofstream timeLog_
Definition: CLHEP.h:16
unsigned getRunNum() const
bool getByToken(EDGetToken token, Handle< PROD > &result) const
Definition: Event.h:525
#define nullptr
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
std::vector< std::string > fileNames_
bool ev
char const * what() const override
Definition: Exception.cc:103
bool check(const std::string &url, IOOffset *size=nullptr) const
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
size_t size() const
Lenght of the data buffer in bytes.
Definition: FEDRawData.h:45
uint32_t getRunNumber(edm::Event &ev) const
uint32_t getOrbitId() const
unsigned getDccLen() const
uint32_t getOrbitId(edm::Event &ev) const
Relative
Definition: Storage.h:22
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
void add(const T &value)
Definition: Majority.h:23
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
int64_t pos(int orb) const
std::map< uint32_t, uint32_t > orbitOffset_
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
static std::string runSubDir(uint32_t runNumber)
RunNumber_t run() const
Definition: Event.h:107
bool mread(char *buf, size_t n, const char *mess=0, bool peek=false)
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
double f[11][100]
bool isValid() const
Definition: HandleBase.h:70
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
int getCalibTriggerType(edm::Event &ev) const
const int nEvts
std::string timeLogFile_
unsigned long long uint64_t
Definition: Time.h:13
MatacqDataFormatter formatter_
static const uint16_t invalid_
Definition: Constants.h:16
uint32_t openedFileRunNumber_
PosEstimator posEstim_
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
string fname
main script
static const int bufferSize
static bool inRange(int)
MatacqProducer(const edm::ParameterSet &params)
edm::EventID id() const
Definition: EventBase.h:59
TString units(TString variable, Char_t axis)
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
static std::string now()
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=0)
MatacqRawEvent matacq_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
std::string rawInstanceName_
void interpretRawData(const FEDRawData &data, EcalMatacqDigiCollection &matacqDigiCollection)
T result(double *proba) const
Definition: Majority.h:28
std::string logFileName_
UInt_t nEvents
Definition: hcalCalib.cc:41
bool msize(filepos_t &s)
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
virtual void rewind(void)
Definition: Storage.cc:87
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=0)
static const stats_t stats_init
#define str(s)
def move(src, dest)
Definition: eostools.py:511
Definition: event.py:1
bool mopen(const std::string &name)
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)