|
|
Go to the documentation of this file.
12 #include <boost/algorithm/string.hpp>
13 #include <boost/format.hpp>
25 #include <sys/types.h>
32 using namespace boost;
52 gettimeofday(&
t,
nullptr);
55 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
59 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
66 digiInstanceName_(
params.getParameter<
string>(
"digiInstanceName")),
67 rawInstanceName_(
params.getParameter<
string>(
"rawInstanceName")),
69 disabled_(
params.getParameter<
bool>(
"disabled")),
70 verbosity_(
params.getUntrackedParameter<
int>(
"verbosity", 0)),
71 produceDigis_(
params.getParameter<
bool>(
"produceDigis")),
72 produceRaw_(
params.getParameter<
bool>(
"produceRaw")),
74 mergeRaw_(
params.getParameter<
bool>(
"mergeRaw")),
75 ignoreTriggerType_(
params.getParameter<
bool>(
"ignoreTriggerType")),
79 openedFileRunNumber_(0),
81 fastRetrievalThresh_(0),
82 orbitOffsetFile_(
params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
"")),
85 logFileName_(
params.getUntrackedParameter<
std::
string>(
"logFileName",
"matacqProducer.log")),
87 onErrorDisablingEvtCnt_(
params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
88 timeLogFile_(
params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
91 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
93 gettimeofday(&
timer_,
nullptr);
98 cout <<
"[LaserSorter " <<
now() <<
"] "
99 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
119 <<
"] registering new "
120 "EcalMatacqDigiCollection product with instance name '"
128 <<
"] registering new FEDRawDataCollection "
129 "product with instance name '"
142 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
147 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
150 gettimeofday(&
t,
nullptr);
152 timeLog_ <<
t.tv_sec <<
"." << setfill(
'0') << setw(3) << (
t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t"
153 << (
t.tv_usec -
timer_.tv_usec) * 1. + (
t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
170 gettimeofday(&
t,
nullptr);
180 std::unique_ptr<FEDRawDataCollection> rawColl;
183 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
185 rawColl = std::make_unique<FEDRawDataCollection>();
189 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
211 <<
". No orbit correction will be applied.";
217 LogInfo(
"Matacq") <<
"Matacq data file found for "
218 <<
"run " <<
runNumber <<
" orbit " << orbitId;
226 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! "
229 <<
"Matacq data will not be included for this event.\n";
236 <<
" to dcc event with orbit id " << orbitId << std::endl;
244 LogWarning(
"Matacq") <<
"No matacq data found for laser event "
245 <<
"of run " <<
runNumber <<
" orbit " << orbitId;
249 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
257 <<
" next events will be skipped, following to an "
258 <<
"error on the last processed event, "
259 <<
"which is expected to be persistant.";
267 cout <<
"[Matacq " <<
now() <<
"] "
268 <<
"Adding FEDRawDataCollection collection "
275 cout <<
"[Matacq " <<
now() <<
"] "
276 <<
"Adding EcalMatacqDigiCollection collection "
330 if (!
mtell(startPos))
333 int32_t startOrb = -1;
334 const size_t headerSize = 8 * 8;
335 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
342 <<
"] Failed to read matacq header. Moved to start of "
346 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
351 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty"
358 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
359 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
362 bool didCoarseMove =
false;
377 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
380 <<
"] Estimated position was beyond end of file. "
390 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " <<
pos <<
"\n";
391 mseek(
pos, SEEK_SET,
"Jumping to estimated event position");
392 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
393 didCoarseMove =
true;
397 didCoarseMove =
false;
398 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
405 <<
"] Event orbit outside of orbit range "
406 "of matacq data file events\n";
424 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
426 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
428 "was worst than original position (-> orbit "
429 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
430 mseek(startPos, SEEK_SET);
431 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
436 bool searchBackward = (orb > orbitId) ?
true :
false;
442 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search "
443 <<
" and move matacq file pointer to beginning of the file. "
444 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")."
451 enum state_t { searching,
found,
failed } state = searching;
453 while (state == searching) {
460 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " <<
pos <<
": orbit = " << orb
461 <<
" len = " << len <<
"x8 Byte"
462 <<
" run = " <<
run <<
"\n";
468 if ((
int)
data_.size() < len * 8) {
473 <<
"] Event found. Reading "
476 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
478 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event."
484 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
488 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " <<
run <<
", orbit ID " << orbitId <<
"."
491 off_t
offset = (searchBackward ? -len : len) * 8;
494 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(
offset) <<
" byte "
495 << (
offset > 0 ?
"forward" :
"backward") <<
".\n";
498 if (
mseek(
offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
499 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
502 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
509 if (state ==
found) {
514 if (
pos == fsize - 1) {
517 <<
"] Event found was at the end of the file. Moving "
518 "stream position to beginning of this event."
521 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
524 return (state ==
found);
529 uint32_t firstOrb, lastOrb;
532 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
533 if (fileChange !=
nullptr)
542 const string runNumberFormat =
"%08d{,_*}";
551 for (
int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry) {
554 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
556 "beyound the range of available one. Waiting for "
558 <<
" seconds in case "
559 "it was not written yet to disk.";
566 boost::algorithm::replace_all(
fname,
"%run_number%", sRunNumber);
569 int rc = glob(
fname.c_str(), GLOB_BRACE,
nullptr, &
g);
575 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
579 <<
"] Read error while calling glob function to look for matacq file paths\n";
588 for (
unsigned iglob = 0; iglob <
g.gl_pathc; ++iglob) {
589 char* thePath =
g.gl_pathv[iglob];
591 static std::atomic<int> nOpenErrors{0};
592 const int maxOpenErrors = 50;
593 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
594 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
596 if (nOpenErrors == maxOpenErrors) {
597 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
598 <<
"th occurence of this error. Report of this error is now disabled.\n";
606 if (goodRange && lastOrb > maxOrb)
608 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
622 LogInfo(
"Matacq") <<
"Uses matacq data file: '" <<
fname <<
"'\n";
626 <<
"] no matacq file found "
631 if (fileChange !=
nullptr)
640 if (fileChange !=
nullptr)
658 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
659 "required to retrieve the orbit ID";
663 for (
int id = 601;
id <= 654; ++
id) {
667 const int orbitIdOffset64 = 3;
668 if (
data.size() >= 8 * (orbitIdOffset64 + 1)) {
669 const unsigned char* pOrbit =
data.data() + orbitIdOffset64 * 8;
670 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
674 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
686 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " <<
ev.id();
695 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
696 "required to retrieve the trigger type";
700 for (
int id = 601;
id <= 654; ++
id) {
704 const int detailedTrigger32 = 5;
705 if (
data.size() >= 4 * (detailedTrigger32 + 1)) {
706 const unsigned char* pTType =
data.data() + detailedTrigger32 * 4;
707 int tType = pTType[1] & 0x7;
712 int tType =
stat.result(&
p);
715 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
720 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
729 const size_t headerSize = 8 * 8;
730 unsigned char data[headerSize];
731 if (!mp->
mread((
char*)
data, headerSize)) {
733 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
748 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
760 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
766 cout <<
"[Matacq " <<
now() <<
"] File size: " <<
s <<
" Number of events: " <<
nEvents << endl;
772 "Moving to beginning of last complete "
774 if (!mp->
mread((
char*)
data, headerSize,
"Reading matacq header",
true)) {
775 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. "
776 "Falling back to safe retrieval mode.";
784 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
790 <<
"Fast matacq event retrieval failure: it looks like "
791 "the matacq file contains events of different sizes.";
807 if (orb < firstOrbit_)
809 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
811 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " <<
r << endl;
818 gettimeofday(&
t,
nullptr);
822 <<
"] Time elapsed between first event and "
823 "destruction of MatacqProducer: "
834 cout <<
"[Matacq " <<
now() <<
"] "
835 <<
"Offset to substract to Matacq events Orbit ID: \n"
836 <<
"#Run Number\t Offset\n";
858 cout <<
run <<
"\t" << orbit <<
"\n";
863 #ifdef USE_STORAGE_MANAGER
869 if (whence == SEEK_SET)
871 else if (whence == SEEK_CUR)
873 else if (whence == SEEK_END)
876 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
881 cout <<
"[Matacq " <<
now() <<
"] ";
883 cout << mess <<
". ";
884 cout <<
"Random access error on input matacq file. ";
885 if (whence == SEEK_SET)
886 cout <<
"Failed to seek absolute position " <<
offset;
887 else if (whence == SEEK_CUR)
888 cout <<
"Failed to move " <<
offset <<
" bytes forward";
889 else if (whence == SEEK_END)
890 cout <<
"Failed to seek position at " <<
offset <<
" bytes before end of file";
891 cout <<
". Reopening file. " <<
e.what() <<
"\n";
919 cout <<
"[Matacq " <<
now() <<
"] ";
921 cout << mess <<
". ";
922 cout <<
"Read failure from input matacq file: " <<
e.what() <<
"\n";
950 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " <<
e.what() <<
". "
951 <<
"File will be reopened.";
990 #else //USE_STORAGE_MANAGER not defined
996 cout <<
"[Matacq " <<
now() <<
"] ";
998 cout << mess <<
". ";
999 cout <<
"Random access error on input matacq file. "
1020 cout <<
"[Matacq " <<
now() <<
"] ";
1022 cout << mess <<
". ";
1023 cout <<
"Read failure from input matacq file.\n";
1030 cout <<
"[Matacq " <<
now() <<
"] ";
1032 cout << mess <<
". ";
1033 cout <<
"Failed to restore file position of "
1034 "before read error. Rewind file.\n";
1061 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1104 #endif //USE_STORAGE_MANAGER defined
1107 int millions =
runNumber / (1000 * 1000);
1108 int thousands = (
runNumber - millions * 1000 * 1000) / 1000;
1109 int units =
runNumber - millions * 1000 * 1000 - thousands * 1000;
1132 const unsigned headerSize = 8 * 8;
1133 unsigned char header[headerSize];
1136 if (!
mread((
char*)
header, headerSize,
nullptr,
false))
1142 unsigned nEvts = fsize / (len * 64);
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
MatacqProducer(const edm::ParameterSet ¶ms)
uint32_t getOrbitId(edm::Event &ev) const
struct MatacqProducer::stats_t stats_
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)
std::map< uint32_t, uint32_t > orbitOffset_
const static stats_t stats_init
bool mread(char *buf, size_t n, const char *mess=nullptr, bool peek=false)
std::string rawInstanceName_
bool mcheck(const std::string &name)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=nullptr)
bool mtell(filepos_t &pos)
MatacqDataFormatter formatter_
static unsigned getOrbitId(unsigned char *data, size_t size)
unsigned getRunNum() const
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
void init(MatacqProducer *mp)
TString units(TString variable, Char_t axis)
const static int orbitTolerance_
uint32_t openedFileRunNumber_
unsigned getDccLen() const
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
std::vector< std::string > fileNames_
bool check(const std::string &url, IOOffset *size=nullptr) const
std::string orbitOffsetFile_
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
uint32_t getRunNumber(edm::Event &ev) const
std::vector< unsigned char > data_
static const int matacqFedId_
size_t size() const
Lenght of the data buffer in bytes.
~MatacqProducer() override
void newRun(int prevRun, int newRun)
double nNonLaserEventsWithMatacq
void addMatacqData(edm::Event &event)
double nLaserEventsWithMatacq
int64_t pos(int orb) const
uint32_t getOrbitId() const
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=nullptr)
char data[epos_bytes_allocation]
unsigned long long uint64_t
Abs< T >::type abs(const T &t)
bool mopen(const std::string &name)
static const char runNumber_[]
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
static const int bufferSize
int getCalibTriggerType(edm::Event &ev) const
std::string digiInstanceName_