|
|
Go to the documentation of this file.
10 #include <sys/types.h>
13 #include <fmt/printf.h>
14 #include <boost/algorithm/string.hpp>
30 using namespace boost;
50 gettimeofday(&
t,
nullptr);
53 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
57 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
64 digiInstanceName_(
params.getParameter<
string>(
"digiInstanceName")),
65 rawInstanceName_(
params.getParameter<
string>(
"rawInstanceName")),
67 disabled_(
params.getParameter<
bool>(
"disabled")),
68 verbosity_(
params.getUntrackedParameter<
int>(
"verbosity", 0)),
69 produceDigis_(
params.getParameter<
bool>(
"produceDigis")),
70 produceRaw_(
params.getParameter<
bool>(
"produceRaw")),
72 mergeRaw_(
params.getParameter<
bool>(
"mergeRaw")),
73 ignoreTriggerType_(
params.getParameter<
bool>(
"ignoreTriggerType")),
77 openedFileRunNumber_(0),
79 fastRetrievalThresh_(0),
80 orbitOffsetFile_(
params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
"")),
83 logFileName_(
params.getUntrackedParameter<
std::
string>(
"logFileName",
"matacqProducer.log")),
85 onErrorDisablingEvtCnt_(
params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
86 timeLogFile_(
params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
89 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
91 gettimeofday(&
timer_,
nullptr);
96 cout <<
"[LaserSorter " <<
now() <<
"] "
97 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
117 <<
"] registering new "
118 "EcalMatacqDigiCollection product with instance name '"
126 <<
"] registering new FEDRawDataCollection "
127 "product with instance name '"
140 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
145 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
148 gettimeofday(&
t,
nullptr);
150 timeLog_ <<
t.tv_sec <<
"." << setfill(
'0') << setw(3) << (
t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t"
151 << (
t.tv_usec -
timer_.tv_usec) * 1. + (
t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
168 gettimeofday(&
t,
nullptr);
178 std::unique_ptr<FEDRawDataCollection> rawColl;
181 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
183 rawColl = std::make_unique<FEDRawDataCollection>();
187 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
209 <<
". No orbit correction will be applied.";
215 LogInfo(
"Matacq") <<
"Matacq data file found for "
216 <<
"run " <<
runNumber <<
" orbit " << orbitId;
224 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! "
227 <<
"Matacq data will not be included for this event.\n";
234 <<
" to dcc event with orbit id " << orbitId << std::endl;
242 LogWarning(
"Matacq") <<
"No matacq data found for laser event "
243 <<
"of run " <<
runNumber <<
" orbit " << orbitId;
247 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
255 <<
" next events will be skipped, following to an "
256 <<
"error on the last processed event, "
257 <<
"which is expected to be persistant.";
265 cout <<
"[Matacq " <<
now() <<
"] "
266 <<
"Adding FEDRawDataCollection collection "
273 cout <<
"[Matacq " <<
now() <<
"] "
274 <<
"Adding EcalMatacqDigiCollection collection "
328 if (!
mtell(startPos))
331 int32_t startOrb = -1;
332 const size_t headerSize = 8 * 8;
333 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
340 <<
"] Failed to read matacq header. Moved to start of "
344 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
349 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty"
356 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
357 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
360 bool didCoarseMove =
false;
375 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
378 <<
"] Estimated position was beyond end of file. "
388 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " <<
pos <<
"\n";
389 mseek(
pos, SEEK_SET,
"Jumping to estimated event position");
390 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
391 didCoarseMove =
true;
395 didCoarseMove =
false;
396 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
403 <<
"] Event orbit outside of orbit range "
404 "of matacq data file events\n";
422 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
424 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
426 "was worst than original position (-> orbit "
427 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
428 mseek(startPos, SEEK_SET);
429 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
434 bool searchBackward = (orb > orbitId) ?
true :
false;
440 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search "
441 <<
" and move matacq file pointer to beginning of the file. "
442 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")."
451 while (
state == searching) {
458 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " <<
pos <<
": orbit = " << orb
459 <<
" len = " << len <<
"x8 Byte"
460 <<
" run = " <<
run <<
"\n";
466 if ((
int)
data_.size() < len * 8) {
471 <<
"] Event found. Reading "
474 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
476 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event."
482 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
486 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " <<
run <<
", orbit ID " << orbitId <<
"."
489 off_t
offset = (searchBackward ? -len : len) * 8;
492 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(
offset) <<
" byte "
493 << (
offset > 0 ?
"forward" :
"backward") <<
".\n";
496 if (
mseek(
offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
497 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
500 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
512 if (
pos == fsize - 1) {
515 <<
"] Event found was at the end of the file. Moving "
516 "stream position to beginning of this event."
519 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
527 uint32_t firstOrb, lastOrb;
530 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
531 if (fileChange !=
nullptr)
540 const string runNumberFormat =
"%08d{,_*}";
541 string sRunNumber = fmt::sprintf(runNumberFormat,
runNumber);
550 for (
int itry = 0; itry < 1 && (orbitId > maxOrb); ++itry) {
553 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
555 "beyound the range of available one. Waiting for "
557 <<
" seconds in case "
558 "it was not written yet to disk.";
565 boost::algorithm::replace_all(
fname,
"%run_number%", sRunNumber);
568 int rc = glob(
fname.c_str(), GLOB_BRACE,
nullptr, &
g);
574 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
578 <<
"] Read error while calling glob function to look for matacq file paths\n";
587 for (
unsigned iglob = 0; iglob <
g.gl_pathc; ++iglob) {
588 char* thePath =
g.gl_pathv[iglob];
590 static std::atomic<int> nOpenErrors{0};
591 const int maxOpenErrors = 50;
592 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
593 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
595 if (nOpenErrors == maxOpenErrors) {
596 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
597 <<
"th occurence of this error. Report of this error is now disabled.\n";
605 std::cout <<
"Get orbit range " << (goodRange ?
"succeeded" :
"failed") <<
". Range: " << firstOrb <<
"..."
607 if (goodRange && lastOrb > maxOrb)
609 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
623 LogInfo(
"Matacq") <<
"Uses matacq data file: '" <<
fname <<
"'\n";
627 <<
"] no matacq file found "
629 <<
runNumber <<
", orbit " << orbitId <<
"\n";
632 if (fileChange !=
nullptr)
641 if (fileChange !=
nullptr)
659 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
660 "required to retrieve the orbit ID";
664 for (
int id = 601;
id <= 654; ++
id) {
668 const int orbitIdOffset64 = 3;
669 if (
data.size() >= 8 * (orbitIdOffset64 + 1)) {
670 const unsigned char* pOrbit =
data.data() + orbitIdOffset64 * 8;
671 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
675 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
687 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " <<
ev.id();
696 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
697 "required to retrieve the trigger type";
701 for (
int id = 601;
id <= 654; ++
id) {
705 const int detailedTrigger32 = 5;
706 if (
data.size() >= 4 * (detailedTrigger32 + 1)) {
707 const unsigned char* pTType =
data.data() + detailedTrigger32 * 4;
708 int tType = pTType[1] & 0x7;
713 int tType =
stat.result(&
p);
716 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
721 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
730 const size_t headerSize = 8 * 8;
731 unsigned char data[headerSize];
732 if (!mp->
mread((
char*)
data, headerSize)) {
734 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
749 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
758 cout <<
"[Matacq " <<
now() <<
"] File is missing!" << endl;
763 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
772 cout <<
"[Matacq " <<
now() <<
"] File size: " <<
s <<
" Number of events: " <<
nEvents << endl;
778 "Moving to beginning of last complete "
780 if (!mp->
mread((
char*)
data, headerSize,
"Reading matacq header",
true)) {
781 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. "
782 "Falling back to safe retrieval mode.";
790 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
796 <<
"Fast matacq event retrieval failure: it looks like "
797 "the matacq file contains events of different sizes.";
813 if (orb < firstOrbit_)
815 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
817 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " <<
r << endl;
824 gettimeofday(&
t,
nullptr);
828 <<
"] Time elapsed between first event and "
829 "destruction of MatacqProducer: "
840 cout <<
"[Matacq " <<
now() <<
"] "
841 <<
"Offset to substract to Matacq events Orbit ID: \n"
842 <<
"#Run Number\t Offset\n";
864 cout <<
run <<
"\t" << orbit <<
"\n";
869 #ifdef USE_STORAGE_MANAGER
875 if (whence == SEEK_SET)
877 else if (whence == SEEK_CUR)
879 else if (whence == SEEK_END)
882 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
887 cout <<
"[Matacq " <<
now() <<
"] ";
889 cout << mess <<
". ";
890 cout <<
"Random access error on input matacq file. ";
891 if (whence == SEEK_SET)
892 cout <<
"Failed to seek absolute position " <<
offset;
893 else if (whence == SEEK_CUR)
894 cout <<
"Failed to move " <<
offset <<
" bytes forward";
895 else if (whence == SEEK_END)
896 cout <<
"Failed to seek position at " <<
offset <<
" bytes before end of file";
897 cout <<
". Reopening file. " <<
e.what() <<
"\n";
925 cout <<
"[Matacq " <<
now() <<
"] ";
927 cout << mess <<
". ";
928 cout <<
"Read failure from input matacq file: " <<
e.what() <<
"\n";
956 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " <<
e.what() <<
". "
957 <<
"File will be reopened.";
996 #else //USE_STORAGE_MANAGER not defined
1002 cout <<
"[Matacq " <<
now() <<
"] ";
1004 cout << mess <<
". ";
1005 cout <<
"Random access error on input matacq file. "
1026 cout <<
"[Matacq " <<
now() <<
"] ";
1028 cout << mess <<
". ";
1029 cout <<
"Read failure from input matacq file.\n";
1036 cout <<
"[Matacq " <<
now() <<
"] ";
1038 cout << mess <<
". ";
1039 cout <<
"Failed to restore file position of "
1040 "before read error. Rewind file.\n";
1067 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1110 #endif //USE_STORAGE_MANAGER defined
1113 int millions =
runNumber / (1000 * 1000);
1114 int thousands = (
runNumber - millions * 1000 * 1000) / 1000;
1115 int units =
runNumber - millions * 1000 * 1000 - thousands * 1000;
1116 return fmt::sprintf(
"%03d/%03d/%03d", millions, thousands,
units);
1138 const unsigned headerSize = 8 * 8;
1139 unsigned char header[headerSize];
1142 if (!
mread((
char*)
header, headerSize,
nullptr,
false))
1148 unsigned nEvts = fsize / (len * 8);
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
MatacqProducer(const edm::ParameterSet ¶ms)
uint32_t getOrbitId(edm::Event &ev) const
struct MatacqProducer::stats_t stats_
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)
std::map< uint32_t, uint32_t > orbitOffset_
const static stats_t stats_init
bool mread(char *buf, size_t n, const char *mess=nullptr, bool peek=false)
std::string rawInstanceName_
bool mcheck(const std::string &name)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=nullptr)
Log< level::Info, false > LogInfo
bool mtell(filepos_t &pos)
MatacqDataFormatter formatter_
static unsigned getOrbitId(unsigned char *data, size_t size)
Log< level::Warning, false > LogWarning
unsigned getRunNum() const
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
void init(MatacqProducer *mp)
TString units(TString variable, Char_t axis)
const static int orbitTolerance_
uint32_t openedFileRunNumber_
unsigned getDccLen() const
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
std::vector< std::string > fileNames_
bool check(const std::string &url, IOOffset *size=nullptr) const
std::string orbitOffsetFile_
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
uint32_t getRunNumber(edm::Event &ev) const
std::vector< unsigned char > data_
static const int matacqFedId_
size_t size() const
Lenght of the data buffer in bytes.
~MatacqProducer() override
void newRun(int prevRun, int newRun)
double nNonLaserEventsWithMatacq
void addMatacqData(edm::Event &event)
double nLaserEventsWithMatacq
int64_t pos(int orb) const
uint32_t getOrbitId() const
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=nullptr)
char data[epos_bytes_allocation]
unsigned long long uint64_t
Abs< T >::type abs(const T &t)
bool mopen(const std::string &name)
static const char runNumber_[]
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
static const int bufferSize
int getCalibTriggerType(edm::Event &ev) const
std::string digiInstanceName_