12 #include <boost/algorithm/string.hpp> 13 #include <boost/format.hpp> 25 #include <sys/types.h> 32 using namespace boost;
52 gettimeofday(&t,
nullptr);
55 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
56 buf[
sizeof(buf) - 1] = 0;
59 buf2 << buf <<
" " << ((t.tv_usec + 500) / 1000) <<
" ms";
65 : fileNames_(params.getParameter<
std::vector<
std::
string> >(
"fileNames")),
66 digiInstanceName_(params.getParameter<
string>(
"digiInstanceName")),
67 rawInstanceName_(params.getParameter<
string>(
"rawInstanceName")),
68 timing_(params.getUntrackedParameter<
bool>(
"timing",
false)),
69 disabled_(params.getParameter<
bool>(
"disabled")),
70 verbosity_(params.getUntrackedParameter<
int>(
"verbosity", 0)),
71 produceDigis_(params.getParameter<
bool>(
"produceDigis")),
72 produceRaw_(params.getParameter<
bool>(
"produceRaw")),
73 inputRawCollection_(params.getParameter<
edm::
InputTag>(
"inputRawCollection")),
74 mergeRaw_(params.getParameter<
bool>(
"mergeRaw")),
75 ignoreTriggerType_(params.getParameter<
bool>(
"ignoreTriggerType")),
79 openedFileRunNumber_(0),
81 fastRetrievalThresh_(0),
82 orbitOffsetFile_(params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
"")),
85 logFileName_(params.getUntrackedParameter<
std::
string>(
"logFileName",
"matacqProducer.log")),
87 onErrorDisablingEvtCnt_(params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
88 timeLogFile_(params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
91 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
93 gettimeofday(&
timer_,
nullptr);
98 cout <<
"[LaserSorter " <<
now() <<
"] " 99 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
119 <<
"] registering new " 120 "EcalMatacqDigiCollection product with instance name '" 128 <<
"] registering new FEDRawDataCollection " 129 "product with instance name '" 142 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
147 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
150 gettimeofday(&t,
nullptr);
152 timeLog_ << t.tv_sec <<
"." << setfill(
'0') << setw(3) << (t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t" 153 << (t.tv_usec -
timer_.tv_usec) * 1. + (t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
170 gettimeofday(&t,
nullptr);
180 std::unique_ptr<FEDRawDataCollection> rawColl;
183 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
185 rawColl = std::make_unique<FEDRawDataCollection>();
189 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
204 LogInfo(
"Matacq") <<
"Run " << runNumber <<
"\t Orbit " << orbitId <<
"\n";
208 map<uint32_t, uint32_t>::iterator it =
orbitOffset_.find(runNumber);
210 LogWarning(
"Matacq") <<
"Orbit offset not found for run " << runNumber
211 <<
". No orbit correction will be applied.";
217 LogInfo(
"Matacq") <<
"Matacq data file found for " 218 <<
"run " << runNumber <<
" orbit " << orbitId;
226 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! " 229 <<
"Matacq data will not be included for this event.\n";
236 <<
" to dcc event with orbit id " << orbitId << std::endl;
244 LogWarning(
"Matacq") <<
"No matacq data found for laser event " 245 <<
"of run " << runNumber <<
" orbit " << orbitId;
249 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
257 <<
" next events will be skipped, following to an " 258 <<
"error on the last processed event, " 259 <<
"which is expected to be persistant.";
267 cout <<
"[Matacq " <<
now() <<
"] " 268 <<
"Adding FEDRawDataCollection collection " 275 cout <<
"[Matacq " <<
now() <<
"] " 276 <<
"Adding EcalMatacqDigiCollection collection " 330 if (!
mtell(startPos))
333 int32_t startOrb = -1;
334 const size_t headerSize = 8 * 8;
335 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
342 <<
"] Failed to read matacq header. Moved to start of " 346 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
351 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty" 358 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
359 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
362 bool didCoarseMove =
false;
377 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
380 <<
"] Estimated position was beyond end of file. " 390 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " << pos <<
"\n";
391 mseek(pos, SEEK_SET,
"Jumping to estimated event position");
392 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
393 didCoarseMove =
true;
397 didCoarseMove =
false;
398 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
405 <<
"] Event orbit outside of orbit range " 406 "of matacq data file events\n";
418 fastRetrievalThresh_ = 2 *
abs(orb - orbitId);
420 cout <<
" to " << fastRetrievalThresh_ <<
"\n";
424 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
426 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
428 "was worst than original position (-> orbit " 429 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
430 mseek(startPos, SEEK_SET);
431 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
436 bool searchBackward = (orb > orbitId) ?
true :
false;
442 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search " 443 <<
" and move matacq file pointer to beginning of the file. " 444 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")." 451 enum state_t { searching,
found,
failed } state = searching;
453 while (state == searching) {
460 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " << pos <<
": orbit = " << orb
461 <<
" len = " << len <<
"x8 Byte" 462 <<
" run = " << run <<
"\n";
468 if ((
int)
data_.size() < len * 8) {
473 <<
"] Event found. Reading " 476 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
478 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event." 484 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
488 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " << run <<
", orbit ID " << orbitId <<
"." 491 off_t
offset = (searchBackward ? -len : len) * 8;
494 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(offset) <<
" byte " 495 << (offset > 0 ?
"forward" :
"backward") <<
".\n";
498 if (
mseek(offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
499 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
502 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
509 if (state ==
found) {
514 if (pos == fsize - 1) {
517 <<
"] Event found was at the end of the file. Moving " 518 "stream position to beginning of this event." 521 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
524 return (state ==
found);
529 uint32_t firstOrb, lastOrb;
532 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
533 if (fileChange !=
nullptr)
542 const string runNumberFormat =
"%08d{,_*}";
551 for (
int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry) {
554 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
556 "beyound the range of available one. Waiting for " 558 <<
" seconds in case " 559 "it was not written yet to disk.";
565 boost::algorithm::replace_all(fname,
"%run_subdir%",
runSubDir(runNumber));
566 boost::algorithm::replace_all(fname,
"%run_number%", sRunNumber);
569 int rc = glob(fname.c_str(), GLOB_BRACE,
nullptr, &
g);
575 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
579 <<
"] Read error while calling glob function to look for matacq file paths\n";
588 for (
unsigned iglob = 0; iglob < g.gl_pathc; ++iglob) {
589 char* thePath = g.gl_pathv[iglob];
591 static std::atomic<int> nOpenErrors{0};
592 const int maxOpenErrors = 50;
593 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
594 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
596 if (nOpenErrors == maxOpenErrors) {
597 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
598 <<
"th occurence of this error. Report of this error is now disabled.\n";
606 if (goodRange && lastOrb > maxOrb)
608 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
613 std::cout <<
"[Matacq " <<
now() <<
"] Switching to file " << fname <<
"\n";
622 LogInfo(
"Matacq") <<
"Uses matacq data file: '" << fname <<
"'\n";
626 <<
"] no matacq file found " 628 << runNumber <<
"\n";
631 if (fileChange !=
nullptr)
640 if (fileChange !=
nullptr)
658 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are " 659 "required to retrieve the orbit ID";
663 for (
int id = 601;
id <= 654; ++
id) {
667 const int orbitIdOffset64 = 3;
668 if (data.
size() >= 8 * (orbitIdOffset64 + 1)) {
669 const unsigned char* pOrbit = data.
data() + orbitIdOffset64 * 8;
670 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
674 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
686 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " << ev.
id();
695 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are " 696 "required to retrieve the trigger type";
700 for (
int id = 601;
id <= 654; ++
id) {
704 const int detailedTrigger32 = 5;
705 if (data.
size() >= 4 * (detailedTrigger32 + 1)) {
706 const unsigned char* pTType = data.
data() + detailedTrigger32 * 4;
707 int tType = pTType[1] & 0x7;
712 int tType = stat.
result(&p);
715 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
720 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
729 const size_t headerSize = 8 * 8;
730 unsigned char data[headerSize];
731 if (!mp->
mread((
char*)data, headerSize)) {
733 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
734 firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
740 cout <<
"[Matacq " <<
now() <<
"] First event orbit: " << firstOrbit_ <<
" event length: " << eventLength_
746 if (eventLength_ == 0) {
748 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
756 const unsigned nEvents = s / eventLength_ / 8;
760 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
766 cout <<
"[Matacq " <<
now() <<
"] File size: " << s <<
" Number of events: " << nEvents << endl;
769 off_t
last = (nEvents - 1) * (off_t)eventLength_ * 8;
772 "Moving to beginning of last complete " 774 if (!mp->
mread((
char*)data, headerSize,
"Reading matacq header",
true)) {
775 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. " 776 "Falling back to safe retrieval mode.";
784 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
787 if (lastLen != eventLength_) {
790 <<
"Fast matacq event retrieval failure: it looks like " 791 "the matacq file contains events of different sizes.";
794 orbitStepMean_ = 112;
798 orbitStepMean_ = (lastOrb - firstOrbit_) / nEvents;
801 cout <<
"[Matacq " <<
now() <<
"] Orbit step mean: " << orbitStepMean_ <<
"\n";
807 if (orb < firstOrbit_)
809 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
811 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " << r << endl;
818 gettimeofday(&t,
nullptr);
822 <<
"] Time elapsed between first event and " 823 "destruction of MatacqProducer: " 834 cout <<
"[Matacq " <<
now() <<
"] " 835 <<
"Offset to substract to Matacq events Orbit ID: \n" 836 <<
"#Run Number\t Offset\n";
858 cout << run <<
"\t" << orbit <<
"\n";
863 #ifdef USE_STORAGE_MANAGER 869 if (whence == SEEK_SET)
871 else if (whence == SEEK_CUR)
873 else if (whence == SEEK_END)
876 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
881 cout <<
"[Matacq " <<
now() <<
"] ";
883 cout << mess <<
". ";
884 cout <<
"Random access error on input matacq file. ";
885 if (whence == SEEK_SET)
886 cout <<
"Failed to seek absolute position " <<
offset;
887 else if (whence == SEEK_CUR)
888 cout <<
"Failed to move " << offset <<
" bytes forward";
889 else if (whence == SEEK_END)
890 cout <<
"Failed to seek position at " << offset <<
" bytes before end of file";
891 cout <<
". Reopening file. " << e.
what() <<
"\n";
919 cout <<
"[Matacq " <<
now() <<
"] ";
921 cout << mess <<
". ";
922 cout <<
"Read failure from input matacq file: " << e.
what() <<
"\n";
950 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " << e.
what() <<
". " 951 <<
"File will be reopened.";
990 #else //USE_STORAGE_MANAGER not defined 994 const int rc = fseeko(
inFile_, offset, whence);
996 cout <<
"[Matacq " <<
now() <<
"] ";
998 cout << mess <<
". ";
999 cout <<
"Random access error on input matacq file. " 1017 bool rc = (pos != -1) && (1 == fread(buf, n, 1,
inFile_));
1020 cout <<
"[Matacq " <<
now() <<
"] ";
1022 cout << mess <<
". ";
1023 cout <<
"Read failure from input matacq file.\n";
1028 if (0 != fseeko(
inFile_, pos, SEEK_SET)) {
1030 cout <<
"[Matacq " <<
now() <<
"] ";
1032 cout << mess <<
". ";
1033 cout <<
"Failed to restore file position of " 1034 "before read error. Rewind file.\n";
1048 if (0 != fstat(fileno(
inFile_), &buf)) {
1061 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1066 return 0 ==
stat(name.c_str(), &dummy);
1080 inFile_ = fopen(name.c_str(),
"r");
1104 #endif //USE_STORAGE_MANAGER defined 1107 int millions = runNumber / (1000 * 1000);
1108 int thousands = (runNumber - millions * 1000 * 1000) / 1000;
1109 int units = runNumber - millions * 1000 * 1000 - thousands * 1000;
1110 return str(
boost::format(
"%03d/%03d/%03d") % millions % thousands % units);
1132 const unsigned headerSize = 8 * 8;
1133 unsigned char header[headerSize];
1136 if (!
mread((
char*)header, headerSize,
nullptr,
false))
1142 unsigned nEvts = fsize / (len * 64);
1144 filepos_t lastEvtPos = (nEvts - 1) * len * 64;
1146 mread((
char*)header, headerSize,
nullptr,
false);
static const char runNumber_[]
T getParameter(std::string const &) const
~MatacqProducer() override
static const int matacqFedId_
static const int orbitTolerance_
unsigned getRunNum() const
bool getByToken(EDGetToken token, Handle< PROD > &result) const
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
std::vector< std::string > fileNames_
char const * what() const override
bool check(const std::string &url, IOOffset *size=nullptr) const
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
size_t size() const
Lenght of the data buffer in bytes.
uint32_t getRunNumber(edm::Event &ev) const
uint32_t getOrbitId() const
unsigned getDccLen() const
uint32_t getOrbitId(edm::Event &ev) const
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
static const StorageFactory * get(void)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
int64_t pos(int orb) const
std::map< uint32_t, uint32_t > orbitOffset_
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
double nNonLaserEventsWithMatacq
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
bool mread(char *buf, size_t n, const char *mess=0, bool peek=false)
Abs< T >::type abs(const T &t)
double nLaserEventsWithMatacq
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
int getCalibTriggerType(edm::Event &ev) const
unsigned long long uint64_t
MatacqDataFormatter formatter_
static const uint16_t invalid_
uint32_t openedFileRunNumber_
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
static const int bufferSize
MatacqProducer(const edm::ParameterSet ¶ms)
TString units(TString variable, Char_t axis)
char data[epos_bytes_allocation]
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=0)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
std::string rawInstanceName_
T result(double *proba) const
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
virtual void rewind(void)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=0)
static const stats_t stats_init
bool mopen(const std::string &name)
std::unique_ptr< Storage > open(const std::string &url, int mode=IOFlags::OpenRead) const
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)