12 #include <boost/algorithm/string.hpp> 13 #include <boost/format.hpp> 25 #include <sys/types.h> 32 using namespace boost;
53 gettimeofday(&t,
nullptr);
56 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
57 buf[
sizeof(buf)-1] = 0;
60 buf2 << buf <<
" " << ((t.tv_usec+500)/1000) <<
" ms";
67 fileNames_(params.getParameter<
std::vector<
std::
string> >(
"fileNames")),
68 digiInstanceName_(params.getParameter<
string>(
"digiInstanceName")),
69 rawInstanceName_(params.getParameter<
string>(
"rawInstanceName")),
70 timing_(params.getUntrackedParameter<
bool>(
"timing",
false)),
71 disabled_(params.getParameter<
bool>(
"disabled")),
72 verbosity_(params.getUntrackedParameter<
int>(
"verbosity", 0)),
73 produceDigis_(params.getParameter<
bool>(
"produceDigis")),
74 produceRaw_(params.getParameter<
bool>(
"produceRaw")),
75 inputRawCollection_(params.getParameter<
edm::
InputTag>(
"inputRawCollection")),
76 mergeRaw_(params.getParameter<
bool>(
"mergeRaw")),
77 ignoreTriggerType_(params.getParameter<
bool>(
"ignoreTriggerType")),
81 openedFileRunNumber_(0),
83 fastRetrievalThresh_(0),
84 orbitOffsetFile_(params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
88 logFileName_(params.getUntrackedParameter<
std::
string>(
"logFileName",
89 "matacqProducer.log")),
91 onErrorDisablingEvtCnt_(params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
92 timeLogFile_(params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
95 if(
verbosity_>=4)
cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
97 gettimeofday(&
timer_,
nullptr);
102 cout <<
"[LaserSorter " <<
now() <<
"] " 103 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
124 "EcalMatacqDigiCollection product with instance name '" 130 if(
verbosity_>0)
cout <<
"[Matacq " <<
now() <<
"] registering new FEDRawDataCollection " 131 "product with instance name '" 143 if(
verbosity_>=4)
cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
149 if(
verbosity_>=4)
cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
152 gettimeofday(&t,
nullptr);
155 << setfill(
'0') << setw(3) << (t.tv_usec+500)/1000 << setfill(
' ')<<
"\t" 156 << (t.tv_usec -
timer_.tv_usec)*1.
157 + (t.tv_sec -
timer_.tv_sec)*1.e6 <<
"\t";
172 gettimeofday(&t,
nullptr);
174 + (t.tv_sec -
timer_.tv_sec)*1.e6 <<
"\n";
185 std::unique_ptr<FEDRawDataCollection> rawColl;
188 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
190 rawColl = std::make_unique<FEDRawDataCollection>();
194 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
212 LogInfo(
"Matacq") <<
"Run " << runNumber <<
"\t Orbit " << orbitId <<
"\n";
216 map<uint32_t,uint32_t>::iterator it =
orbitOffset_.find(runNumber);
218 LogWarning(
"Matacq") <<
"Orbit offset not found for run " 220 <<
". No orbit correction will be applied.";
226 LogInfo(
"Matacq") <<
"Matacq data file found for " 227 <<
"run " << runNumber <<
" orbit " << orbitId;
235 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! " 237 <<
"*8 Bytes, Parsed len: " 239 <<
"Matacq data will not be included for this event.\n";
246 LogInfo(
"Matacq") <<
"Associating matacq data with orbit id " 248 <<
" to dcc event with orbit id " 249 << orbitId << std::endl;
257 LogWarning(
"Matacq") <<
"No matacq data found for laser event " 258 <<
"of run " << runNumber <<
" orbit " 263 LogWarning(
"Matacq") <<
"No matacq file found for event " 273 <<
" next events will be skipped, following to an " 274 <<
"error on the last processed event, " 275 <<
"which is expected to be persistant.";
283 <<
"Adding FEDRawDataCollection collection " 290 <<
"Adding EcalMatacqDigiCollection collection " 347 if(!
mtell(startPos))
return false;
349 int32_t startOrb = -1;
350 const size_t headerSize = 8*8;
351 if(
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)){
353 if(startOrb<0) startOrb = 0;
356 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq header. Moved to start of " 360 if(
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)){
364 if(
verbosity_>2)
cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty" 371 <<
" looking for orbit " << orbitId
372 <<
". Current file position: " << startPos
373 <<
" Orbit at current position: " << startOrb <<
"\n";
376 bool didCoarseMove =
false;
392 pos = ((int64_t)fsize/evtSize-1)*evtSize;
394 cout <<
"[Matacq " <<
now() <<
"] Estimated position was beyond end of file. " 395 "Changed to " << pos <<
"\n";
402 if(
verbosity_>2)
cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " 404 mseek(pos, SEEK_SET,
"Jumping to estimated event position");
405 if(
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)){
406 didCoarseMove =
true;
410 didCoarseMove =
false;
411 if(!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)){
416 if(
verbosity_)
cout <<
"[Matacq " <<
now() <<
"] Event orbit outside of orbit range " 417 "of matacq data file events\n";
427 if(
verbosity_>2)
cout <<
"[Matacq " <<
now() <<
"] Fast retrieval threshold increased from " 429 fastRetrievalThresh_ = 2*
abs(orb-orbitId);
435 && (
abs(orb-orbitId) >
abs(startOrb-orbitId))){
436 if(
verbosity_>2)
cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb <<
") " 437 "was worst than original position (-> orbit " 439 <<
"). Restoring position (" << startPos <<
").\n";
440 mseek(startPos, SEEK_SET);
441 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
446 bool searchBackward = (orb>orbitId)?
true:
false;
452 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search " 453 <<
" and move matacq file pointer to beginning of the file. " 454 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")." 461 enum state_t { searching,
found,
failed } state = searching;
463 while(state == searching){
470 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " 472 <<
": orbit = " << orb
473 <<
" len = " << len <<
"x8 Byte" 474 <<
" run = " << run <<
"\n";
477 && (runNumber==0 || runNumber==run)){
481 if((
int)
data_.size() < len*8){
485 " matacq event." <<
"\n";
486 if(!
mread((
char*)&
data_[0], len*8,
"Reading matacq event")){
493 if((searchBackward && (orb < orbitId))
494 || (!searchBackward && (orb > orbitId))){
498 <<
"] No matacq data found for run " << run
499 <<
", orbit ID " << orbitId <<
"." <<
"\n";
501 off_t
offset = (searchBackward?-len:len)*8;
504 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " 505 <<
abs(offset) <<
" byte " << (offset>0?
"forward":
"backward")
509 if(
mseek(offset, SEEK_CUR,
510 (searchBackward?
"Moving to previous event":
511 "Moving to next event"))
512 &&
mread((
char*)&
data_[0], headerSize,
"Reading event header",
515 if(!searchBackward)
mseek(-len*8, SEEK_CUR,
516 "Moving to start of last complete event");
530 cout <<
"[Matacq " <<
now() <<
"] Event found was at the end of the file. Moving " 531 "stream position to beginning of this event." 534 mseek(-(
int)len*8-1, SEEK_CUR,
535 "Moving to beginning of last matacq event");
538 return (state==
found);
547 uint32_t firstOrb, lastOrb;
550 if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
551 if(fileChange!=
nullptr) *fileChange =
false;
558 const string runNumberFormat =
"%08d{,_*}";
567 for(
int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry){
570 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId <<
") goes " 571 "beyound the range of available one. Waiting for " << n_sec <<
" seconds in case " 572 "it was not written yet to disk.";
578 boost::algorithm::replace_all(fname,
"%run_subdir%",
580 boost::algorithm::replace_all(fname,
"%run_number%", sRunNumber);
583 int rc = glob(fname.c_str(), GLOB_BRACE,
nullptr, &
g);
588 std::cout <<
"[Matacq " <<
now() <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
591 std::cout <<
"[Matacq " <<
now() <<
"] Read error while calling glob function to look for matacq file paths\n";
600 for(
unsigned iglob = 0; iglob < g.gl_pathc; ++iglob){
601 char* thePath = g.gl_pathv[iglob];
603 static std::atomic<int> nOpenErrors {0};
604 const int maxOpenErrors = 50;
605 if(!
mopen(thePath) && nOpenErrors < maxOpenErrors){
606 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
608 if(nOpenErrors == maxOpenErrors){
609 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
610 <<
"th occurence of this error. Report of this error is now disabled.\n";
618 if(goodRange && lastOrb > maxOrb) maxOrb = lastOrb;
619 if(goodRange && firstOrb <= orbitId && orbitId <= lastOrb){
632 LogInfo(
"Matacq") <<
"Uses matacq data file: '" << fname <<
"'\n";
635 "for run " << runNumber <<
"\n";
638 if(fileChange!=
nullptr) *fileChange =
false;
646 if(fileChange!=
nullptr) *fileChange =
true;
667 <<
"No FED raw data collection found. ECAL raw data are " 668 "required to retrieve the orbit ID";
672 for(
int id=601;
id<=654; ++
id){
675 const int orbitIdOffset64 = 3;
676 if(data.
size()>=8*(orbitIdOffset64+1)){
677 const unsigned char* pOrbit = data.
data() + orbitIdOffset64*8;
678 int thisOrbit = pOrbit[0]
686 <<
"Orbit ID inconsitency in DCC headers";
690 if(thisOrbit!=0) orbit = thisOrbit;
697 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " 708 <<
"No FED raw data collection found. ECAL raw data are " 709 "required to retrieve the trigger type";
713 for(
int id=601;
id<=654; ++
id){
716 const int detailedTrigger32 = 5;
717 if(data.
size()>=4*(detailedTrigger32+1)){
718 const unsigned char* pTType = data.
data() + detailedTrigger32*4;
719 int tType = pTType[1] & 0x7;
724 int tType = stat.
result(&p);
727 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
732 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
741 const size_t headerSize = 8*8;
742 unsigned char data[headerSize];
743 if(!mp->
mread((
char*)data, headerSize)){
745 firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
750 if(
verbosity_>1)
cout <<
"[Matacq " <<
now() <<
"] First event orbit: " << firstOrbit_
751 <<
" event length: " << eventLength_
766 const unsigned nEvents = s/eventLength_/8;
775 <<
" Number of events: " << nEvents << endl;
778 off_t
last = (nEvents-1)*(off_t)eventLength_*8;
779 mp->
mseek(last, SEEK_SET,
"Moving to beginning of last complete " 781 if(!mp->
mread((
char*) data, headerSize,
"Reading matacq header",
true)){
782 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. " 783 "Falling back to safe retrieval mode.";
790 if(
verbosity_>1)
cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb
791 <<
" last event length: " << lastLen << endl;
794 if(lastLen!=eventLength_){
797 <<
"Fast matacq event retrieval failure: it looks like " 798 "the matacq file contains events of different sizes.";
801 orbitStepMean_ = 112;
805 orbitStepMean_ = (lastOrb - firstOrbit_)/nEvents;
807 if(
verbosity_>1)
cout <<
"[Matacq " <<
now() <<
"] Orbit step mean: " << orbitStepMean_
814 if(orb<firstOrbit_)
return -1;
816 (((
uint64_t)(orb-firstOrbit_))/orbitStepMean_)*eventLength_*8
818 if(
verbosity_>2)
cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb
819 <<
": " << r << endl;
826 gettimeofday(&t,
nullptr);
829 cout <<
"[Matacq " <<
now() <<
"] Time elapsed between first event and " 830 "destruction of MatacqProducer: " 832 + (t.tv_usec-
startTime_.tv_usec)*1.e-6) <<
"s\n";
840 <<
"Failed to open orbit ID correction file '" 844 cout <<
"[Matacq " <<
now() <<
"] " 845 <<
"Offset to substract to Matacq events Orbit ID: \n" 846 <<
"#Run Number\t Offset\n";
867 <<
"Syntax error in Orbit offset file '" 870 cout << run <<
"\t" << orbit <<
"\n";
875 #ifdef USE_STORAGE_MANAGER 877 if(0==
inFile_.get())
return false;
884 << __FILE__ <<
": "<< __LINE__ <<
"\n";
889 cout <<
"[Matacq " <<
now() <<
"] ";
890 if(mess)
cout << mess <<
". ";
891 cout <<
"Random access error on input matacq file. ";
892 if(whence==SEEK_SET)
cout <<
"Failed to seek absolute position " <<
offset;
893 else if(whence==SEEK_CUR)
cout <<
"Failed to move " << offset <<
" bytes forward";
894 else if(whence==SEEK_END)
cout <<
"Failed to seek position at " << offset <<
" bytes before end of file";
895 cout <<
". Reopening file. " << e.
what() <<
"\n";
904 if(0==
inFile_.get())
return false;
910 if(0==
inFile_.get())
return false;
913 if(!
mtell(pos))
return false;
920 cout <<
"[Matacq " <<
now() <<
"] ";
921 if(mess)
cout << mess <<
". ";
922 cout <<
"Read failure from input matacq file: " 937 if(
inFile_.get()==0)
return false;
944 if(file==0)
return false;
950 <<
"File will be reopened.";
990 if(
inFile_.get()==0)
return true;
994 #else //USE_STORAGE_MANAGER not defined 996 if(
nullptr==
inFile_)
return false;
997 const int rc = fseeko(
inFile_, offset, whence);
999 cout <<
"[Matacq " <<
now() <<
"] ";
1000 if(mess)
cout << mess <<
". ";
1001 cout <<
"Random access error on input matacq file. " 1009 if(
nullptr==
inFile_)
return false;
1016 if(
nullptr==
inFile_)
return false;
1018 bool rc = (pos!=-1) && (1==fread(buf, n, 1,
inFile_));
1021 cout <<
"[Matacq " <<
now() <<
"] ";
1022 if(mess)
cout << mess <<
". ";
1023 cout <<
"Read failure from input matacq file.\n";
1028 if(0!=fseeko(
inFile_, pos, SEEK_SET)){
1030 cout <<
"[Matacq " <<
now() <<
"] ";
1031 if(mess)
cout << mess <<
". ";
1032 cout <<
"Failed to restore file position of " 1033 "before read error. Rewind file.\n";
1044 if(
nullptr==
inFile_)
return false;
1046 if(0!=fstat(fileno(
inFile_), &buf)){
1056 if(
nullptr==
inFile_)
return false;
1058 return fseeko(
inFile_, 0, SEEK_SET)!=0;
1063 return 0==
stat(name.c_str(), &dummy);
1076 inFile_ = fopen(name.c_str(),
"r");
1096 if(
nullptr==
inFile_)
return true;
1100 #endif //USE_STORAGE_MANAGER defined 1103 int millions = runNumber / (1000*1000);
1104 int thousands = (runNumber-millions*1000*1000) / 1000;
1105 int units = runNumber-millions*1000*1000 - thousands*1000;
1106 return str(
boost::format(
"%03d/%03d/%03d") % millions % thousands % units);
1112 logFile_ <<
"[" <<
now() <<
"] Event count for run " 1115 <<
"Laser event with Matacq data: " 1117 <<
"Non laser event (according to DCC header) with Matacq data: " 1132 const unsigned headerSize = 8*8;
1133 unsigned char header[headerSize];
1136 if(!
mread((
char*)header, headerSize,
nullptr,
false))
return false;
1141 unsigned nEvts = fsize / (len*64);
1143 filepos_t lastEvtPos = (nEvts - 1) * len * 64;
1145 mread((
char*)header, headerSize,
nullptr,
false);
static const char runNumber_[]
bool check(const std::string &url, IOOffset *size=0) const
T getParameter(std::string const &) const
~MatacqProducer() override
std::map< uint32_t, uint32_t > orbitOffset_
static const int matacqFedId_
static const int orbitTolerance_
unsigned getRunNum() const
bool getByToken(EDGetToken token, Handle< PROD > &result) const
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
std::vector< std::string > fileNames_
char const * what() const override
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
size_t size() const
Lenght of the data buffer in bytes.
uint32_t getRunNumber(edm::Event &ev) const
uint32_t getOrbitId() const
unsigned getDccLen() const
uint32_t getOrbitId(edm::Event &ev) const
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
int64_t pos(int orb) const
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
double nNonLaserEventsWithMatacq
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
bool mread(char *buf, size_t n, const char *mess=0, bool peek=false)
Abs< T >::type abs(const T &t)
format
Some error handling for the usage.
double nLaserEventsWithMatacq
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
int getCalibTriggerType(edm::Event &ev) const
unsigned long long uint64_t
MatacqDataFormatter formatter_
static const uint16_t invalid_
uint32_t openedFileRunNumber_
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
static const int bufferSize
MatacqProducer(const edm::ParameterSet ¶ms)
TString units(TString variable, Char_t axis)
char data[epos_bytes_allocation]
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=0)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
std::string rawInstanceName_
T result(double *proba) const
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
virtual void rewind(void)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=0)
static const stats_t stats_init
bool mopen(const std::string &name)
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)