|
|
Go to the documentation of this file.
10 #include <sys/types.h>
13 #include <fmt/printf.h>
14 #include <boost/algorithm/string.hpp>
30 using namespace boost;
50 gettimeofday(&
t,
nullptr);
53 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
57 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
64 digiInstanceName_(
params.getParameter<
string>(
"digiInstanceName")),
65 rawInstanceName_(
params.getParameter<
string>(
"rawInstanceName")),
67 disabled_(
params.getParameter<
bool>(
"disabled")),
68 verbosity_(
params.getUntrackedParameter<
int>(
"verbosity", 0)),
69 produceDigis_(
params.getParameter<
bool>(
"produceDigis")),
70 produceRaw_(
params.getParameter<
bool>(
"produceRaw")),
72 mergeRaw_(
params.getParameter<
bool>(
"mergeRaw")),
73 ignoreTriggerType_(
params.getParameter<
bool>(
"ignoreTriggerType")),
77 openedFileRunNumber_(0),
79 fastRetrievalThresh_(0),
80 orbitOffsetFile_(
params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
"")),
83 logFileName_(
params.getUntrackedParameter<
std::
string>(
"logFileName",
"matacqProducer.log")),
85 onErrorDisablingEvtCnt_(
params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
86 timeLogFile_(
params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
89 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
91 gettimeofday(&
timer_,
nullptr);
96 cout <<
"[LaserSorter " <<
now() <<
"] "
97 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
117 <<
"] registering new "
118 "EcalMatacqDigiCollection product with instance name '"
126 <<
"] registering new FEDRawDataCollection "
127 "product with instance name '"
140 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
145 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
148 gettimeofday(&
t,
nullptr);
150 timeLog_ <<
t.tv_sec <<
"." << setfill(
'0') << setw(3) << (
t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t"
151 << (
t.tv_usec -
timer_.tv_usec) * 1. + (
t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
168 gettimeofday(&
t,
nullptr);
178 std::unique_ptr<FEDRawDataCollection> rawColl;
181 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
183 rawColl = std::make_unique<FEDRawDataCollection>();
187 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
209 <<
". No orbit correction will be applied.";
215 LogInfo(
"Matacq") <<
"Matacq data file found for "
216 <<
"run " <<
runNumber <<
" orbit " << orbitId;
224 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! "
227 <<
"Matacq data will not be included for this event.\n";
234 <<
" to dcc event with orbit id " << orbitId << std::endl;
242 LogWarning(
"Matacq") <<
"No matacq data found for laser event "
243 <<
"of run " <<
runNumber <<
" orbit " << orbitId;
247 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
255 <<
" next events will be skipped, following to an "
256 <<
"error on the last processed event, "
257 <<
"which is expected to be persistant.";
265 cout <<
"[Matacq " <<
now() <<
"] "
266 <<
"Adding FEDRawDataCollection collection "
273 cout <<
"[Matacq " <<
now() <<
"] "
274 <<
"Adding EcalMatacqDigiCollection collection "
328 if (!
mtell(startPos))
331 int32_t startOrb = -1;
332 const size_t headerSize = 8 * 8;
333 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
340 <<
"] Failed to read matacq header. Moved to start of "
344 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
349 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty"
356 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
357 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
360 bool didCoarseMove =
false;
375 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
378 <<
"] Estimated position was beyond end of file. "
388 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " <<
pos <<
"\n";
389 mseek(
pos, SEEK_SET,
"Jumping to estimated event position");
390 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
391 didCoarseMove =
true;
395 didCoarseMove =
false;
396 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
403 <<
"] Event orbit outside of orbit range "
404 "of matacq data file events\n";
422 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
424 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
426 "was worst than original position (-> orbit "
427 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
428 mseek(startPos, SEEK_SET);
429 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
434 bool searchBackward = (orb > orbitId) ?
true :
false;
440 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search "
441 <<
" and move matacq file pointer to beginning of the file. "
442 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")."
451 while (
state == searching) {
458 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " <<
pos <<
": orbit = " << orb
459 <<
" len = " << len <<
"x8 Byte"
460 <<
" run = " <<
run <<
"\n";
466 if ((
int)
data_.size() < len * 8) {
471 <<
"] Event found. Reading "
474 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
476 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event."
482 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
486 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " <<
run <<
", orbit ID " << orbitId <<
"."
489 off_t
offset = (searchBackward ? -len : len) * 8;
492 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(
offset) <<
" byte "
493 << (
offset > 0 ?
"forward" :
"backward") <<
".\n";
496 if (
mseek(
offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
497 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
500 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
512 if (
pos == fsize - 1) {
515 <<
"] Event found was at the end of the file. Moving "
516 "stream position to beginning of this event."
519 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
527 uint32_t firstOrb, lastOrb;
530 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
531 if (fileChange !=
nullptr)
540 const string runNumberFormat =
"%08d{,_*}";
541 string sRunNumber = fmt::sprintf(runNumberFormat,
runNumber);
549 for (
int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry) {
552 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
554 "beyound the range of available one. Waiting for "
556 <<
" seconds in case "
557 "it was not written yet to disk.";
564 boost::algorithm::replace_all(
fname,
"%run_number%", sRunNumber);
567 int rc = glob(
fname.c_str(), GLOB_BRACE,
nullptr, &
g);
573 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
577 <<
"] Read error while calling glob function to look for matacq file paths\n";
586 for (
unsigned iglob = 0; iglob <
g.gl_pathc; ++iglob) {
587 char* thePath =
g.gl_pathv[iglob];
589 static std::atomic<int> nOpenErrors{0};
590 const int maxOpenErrors = 50;
591 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
592 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
594 if (nOpenErrors == maxOpenErrors) {
595 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
596 <<
"th occurence of this error. Report of this error is now disabled.\n";
604 if (goodRange && lastOrb > maxOrb)
606 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
620 LogInfo(
"Matacq") <<
"Uses matacq data file: '" <<
fname <<
"'\n";
624 <<
"] no matacq file found "
629 if (fileChange !=
nullptr)
638 if (fileChange !=
nullptr)
656 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
657 "required to retrieve the orbit ID";
661 for (
int id = 601;
id <= 654; ++
id) {
665 const int orbitIdOffset64 = 3;
666 if (
data.size() >= 8 * (orbitIdOffset64 + 1)) {
667 const unsigned char* pOrbit =
data.data() + orbitIdOffset64 * 8;
668 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
672 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
684 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " <<
ev.id();
693 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
694 "required to retrieve the trigger type";
698 for (
int id = 601;
id <= 654; ++
id) {
702 const int detailedTrigger32 = 5;
703 if (
data.size() >= 4 * (detailedTrigger32 + 1)) {
704 const unsigned char* pTType =
data.data() + detailedTrigger32 * 4;
705 int tType = pTType[1] & 0x7;
710 int tType =
stat.result(&
p);
713 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
718 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
727 const size_t headerSize = 8 * 8;
728 unsigned char data[headerSize];
729 if (!mp->
mread((
char*)
data, headerSize)) {
731 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
746 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
758 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
764 cout <<
"[Matacq " <<
now() <<
"] File size: " <<
s <<
" Number of events: " <<
nEvents << endl;
770 "Moving to beginning of last complete "
772 if (!mp->
mread((
char*)
data, headerSize,
"Reading matacq header",
true)) {
773 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. "
774 "Falling back to safe retrieval mode.";
782 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
788 <<
"Fast matacq event retrieval failure: it looks like "
789 "the matacq file contains events of different sizes.";
805 if (orb < firstOrbit_)
807 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
809 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " <<
r << endl;
816 gettimeofday(&
t,
nullptr);
820 <<
"] Time elapsed between first event and "
821 "destruction of MatacqProducer: "
832 cout <<
"[Matacq " <<
now() <<
"] "
833 <<
"Offset to substract to Matacq events Orbit ID: \n"
834 <<
"#Run Number\t Offset\n";
856 cout <<
run <<
"\t" << orbit <<
"\n";
861 #ifdef USE_STORAGE_MANAGER
867 if (whence == SEEK_SET)
869 else if (whence == SEEK_CUR)
871 else if (whence == SEEK_END)
874 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
879 cout <<
"[Matacq " <<
now() <<
"] ";
881 cout << mess <<
". ";
882 cout <<
"Random access error on input matacq file. ";
883 if (whence == SEEK_SET)
884 cout <<
"Failed to seek absolute position " <<
offset;
885 else if (whence == SEEK_CUR)
886 cout <<
"Failed to move " <<
offset <<
" bytes forward";
887 else if (whence == SEEK_END)
888 cout <<
"Failed to seek position at " <<
offset <<
" bytes before end of file";
889 cout <<
". Reopening file. " <<
e.what() <<
"\n";
917 cout <<
"[Matacq " <<
now() <<
"] ";
919 cout << mess <<
". ";
920 cout <<
"Read failure from input matacq file: " <<
e.what() <<
"\n";
948 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " <<
e.what() <<
". "
949 <<
"File will be reopened.";
988 #else //USE_STORAGE_MANAGER not defined
994 cout <<
"[Matacq " <<
now() <<
"] ";
996 cout << mess <<
". ";
997 cout <<
"Random access error on input matacq file. "
1018 cout <<
"[Matacq " <<
now() <<
"] ";
1020 cout << mess <<
". ";
1021 cout <<
"Read failure from input matacq file.\n";
1028 cout <<
"[Matacq " <<
now() <<
"] ";
1030 cout << mess <<
". ";
1031 cout <<
"Failed to restore file position of "
1032 "before read error. Rewind file.\n";
1059 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1102 #endif //USE_STORAGE_MANAGER defined
1105 int millions =
runNumber / (1000 * 1000);
1106 int thousands = (
runNumber - millions * 1000 * 1000) / 1000;
1107 int units =
runNumber - millions * 1000 * 1000 - thousands * 1000;
1108 return fmt::sprintf(
"%03d/%03d/%03d", millions, thousands,
units);
1130 const unsigned headerSize = 8 * 8;
1131 unsigned char header[headerSize];
1134 if (!
mread((
char*)
header, headerSize,
nullptr,
false))
1140 unsigned nEvts = fsize / (len * 64);
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
MatacqProducer(const edm::ParameterSet ¶ms)
uint32_t getOrbitId(edm::Event &ev) const
struct MatacqProducer::stats_t stats_
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)
std::map< uint32_t, uint32_t > orbitOffset_
const static stats_t stats_init
bool mread(char *buf, size_t n, const char *mess=nullptr, bool peek=false)
std::string rawInstanceName_
bool mcheck(const std::string &name)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=nullptr)
Log< level::Info, false > LogInfo
bool mtell(filepos_t &pos)
MatacqDataFormatter formatter_
static unsigned getOrbitId(unsigned char *data, size_t size)
Log< level::Warning, false > LogWarning
unsigned getRunNum() const
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
void init(MatacqProducer *mp)
TString units(TString variable, Char_t axis)
const static int orbitTolerance_
uint32_t openedFileRunNumber_
unsigned getDccLen() const
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
std::vector< std::string > fileNames_
bool check(const std::string &url, IOOffset *size=nullptr) const
std::string orbitOffsetFile_
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
uint32_t getRunNumber(edm::Event &ev) const
std::vector< unsigned char > data_
static const int matacqFedId_
size_t size() const
Lenght of the data buffer in bytes.
~MatacqProducer() override
void newRun(int prevRun, int newRun)
double nNonLaserEventsWithMatacq
void addMatacqData(edm::Event &event)
double nLaserEventsWithMatacq
int64_t pos(int orb) const
uint32_t getOrbitId() const
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=nullptr)
char data[epos_bytes_allocation]
unsigned long long uint64_t
Abs< T >::type abs(const T &t)
bool mopen(const std::string &name)
static const char runNumber_[]
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
static const int bufferSize
int getCalibTriggerType(edm::Event &ev) const
std::string digiInstanceName_