3 #ifdef USE_STORAGE_MANAGER
6 #else //USE_STORAGE_MANAGER not defined
7 #ifndef _LARGEFILE64_SOURCE
8 #define _LARGEFILE64_SOURCE
9 #endif //_LARGEFILE64_SOURCE not defined
10 #define _FILE_OFFSET_BITS 64
12 #endif //USE_STORAGE_MANAGER defined
42 #ifdef USE_STORAGE_MANAGER
44 typedef std::unique_ptr<Storage>
FILE_t;
86 int64_t
pos(
int orb)
const;
163 bool mread(
char*
buf,
size_t n,
const char* mess =
nullptr,
bool peek =
false);
321 #include <sys/stat.h>
322 #include <sys/types.h>
325 #include <fmt/printf.h>
326 #include <boost/algorithm/string.hpp>
340 using namespace boost;
360 gettimeofday(&t,
nullptr);
363 strftime(buf,
sizeof(buf),
"%F %R %S s", localtime(&t.tv_sec));
364 buf[
sizeof(
buf) - 1] = 0;
367 buf2 << buf <<
" " << ((t.tv_usec + 500) / 1000) <<
" ms";
373 : fileNames_(params.getParameter<std::
vector<std::
string> >(
"fileNames")),
374 digiInstanceName_(params.getParameter<
string>(
"digiInstanceName")),
375 rawInstanceName_(params.getParameter<
string>(
"rawInstanceName")),
376 timing_(params.getUntrackedParameter<bool>(
"timing",
false)),
377 disabled_(params.getParameter<bool>(
"disabled")),
378 verbosity_(params.getUntrackedParameter<int>(
"verbosity", 0)),
379 produceDigis_(params.getParameter<bool>(
"produceDigis")),
380 produceRaw_(params.getParameter<bool>(
"produceRaw")),
381 inputRawCollection_(params.getParameter<edm::
InputTag>(
"inputRawCollection")),
382 mergeRaw_(params.getParameter<bool>(
"mergeRaw")),
383 ignoreTriggerType_(params.getParameter<bool>(
"ignoreTriggerType")),
387 openedFileRunNumber_(0),
389 fastRetrievalThresh_(0),
390 orbitOffsetFile_(params.getUntrackedParameter<std::
string>(
"orbitOffsetFile",
"")),
393 logFileName_(params.getUntrackedParameter<std::
string>(
"logFileName",
"matacqProducer.log")),
394 eventSkipCounter_(0),
395 onErrorDisablingEvtCnt_(params.getParameter<int>(
"onErrorDisablingEvtCnt")),
396 timeLogFile_(params.getUntrackedParameter<std::
string>(
"timeLogFile",
"")),
399 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
401 gettimeofday(&
timer_,
nullptr);
406 cout <<
"[LaserSorter " <<
now() <<
"] "
407 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
427 <<
"] registering new "
428 "EcalMatacqDigiCollection product with instance name '"
436 <<
"] registering new FEDRawDataCollection "
437 "product with instance name '"
450 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
455 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
458 gettimeofday(&t,
nullptr);
460 timeLog_ << t.tv_sec <<
"." << setfill(
'0') << setw(3) << (t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t"
461 << (t.tv_usec -
timer_.tv_usec) * 1. + (t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
478 gettimeofday(&t,
nullptr);
488 std::unique_ptr<FEDRawDataCollection> rawColl;
491 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
493 rawColl = std::make_unique<FEDRawDataCollection>();
497 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
512 LogInfo(
"Matacq") <<
"Run " << runNumber <<
"\t Orbit " << orbitId <<
"\n";
516 map<uint32_t, uint32_t>::iterator it =
orbitOffset_.find(runNumber);
518 LogWarning(
"Matacq") <<
"Orbit offset not found for run " << runNumber
519 <<
". No orbit correction will be applied.";
525 LogInfo(
"Matacq") <<
"Matacq data file found for "
526 <<
"run " << runNumber <<
" orbit " << orbitId;
534 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! "
537 <<
"Matacq data will not be included for this event.\n";
544 <<
" to dcc event with orbit id " << orbitId << std::endl;
552 LogWarning(
"Matacq") <<
"No matacq data found for laser event "
553 <<
"of run " << runNumber <<
" orbit " << orbitId;
557 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
565 <<
" next events will be skipped, following to an "
566 <<
"error on the last processed event, "
567 <<
"which is expected to be persistant.";
575 cout <<
"[Matacq " <<
now() <<
"] "
576 <<
"Adding FEDRawDataCollection collection "
583 cout <<
"[Matacq " <<
now() <<
"] "
584 <<
"Adding EcalMatacqDigiCollection collection "
638 if (!
mtell(startPos))
641 int32_t startOrb = -1;
642 const size_t headerSize = 8 * 8;
643 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
650 <<
"] Failed to read matacq header. Moved to start of "
654 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
659 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty"
666 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
667 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
670 bool didCoarseMove =
false;
685 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
688 <<
"] Estimated position was beyond end of file. "
698 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " << pos <<
"\n";
699 mseek(pos, SEEK_SET,
"Jumping to estimated event position");
700 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
701 didCoarseMove =
true;
705 didCoarseMove =
false;
706 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
713 <<
"] Event orbit outside of orbit range "
714 "of matacq data file events\n";
726 fastRetrievalThresh_ = 2 *
abs(orb - orbitId);
728 cout <<
" to " << fastRetrievalThresh_ <<
"\n";
732 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
734 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
736 "was worst than original position (-> orbit "
737 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
738 mseek(startPos, SEEK_SET);
739 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
744 bool searchBackward = (orb > orbitId) ?
true :
false;
750 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search "
751 <<
" and move matacq file pointer to beginning of the file. "
752 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")."
759 enum state_t { searching,
found, failed }
state = searching;
761 while (
state == searching) {
768 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " << pos <<
": orbit = " << orb
769 <<
" len = " << len <<
"x8 Byte"
770 <<
" run = " << run <<
"\n";
776 if ((
int)
data_.size() < len * 8) {
781 <<
"] Event found. Reading "
784 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
786 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event."
792 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
796 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " << run <<
", orbit ID " << orbitId <<
"."
799 off_t
offset = (searchBackward ? -len : len) * 8;
802 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(offset) <<
" byte "
803 << (offset > 0 ?
"forward" :
"backward") <<
".\n";
806 if (
mseek(offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
807 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
810 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
822 if (pos == fsize - 1) {
825 <<
"] Event found was at the end of the file. Moving "
826 "stream position to beginning of this event."
829 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
837 uint32_t firstOrb, lastOrb;
840 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
841 if (fileChange !=
nullptr)
850 const string runNumberFormat =
"%08d{,_*}";
851 string sRunNumber = fmt::sprintf(runNumberFormat, runNumber);
860 for (
int itry = 0; itry < 1 && (orbitId > maxOrb); ++itry) {
863 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
865 "beyound the range of available one. Waiting for "
867 <<
" seconds in case "
868 "it was not written yet to disk.";
874 boost::algorithm::replace_all(fname,
"%run_subdir%",
runSubDir(runNumber));
875 boost::algorithm::replace_all(fname,
"%run_number%", sRunNumber);
878 int rc = glob(fname.c_str(), GLOB_BRACE,
nullptr, &
g);
884 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
888 <<
"] Read error while calling glob function to look for matacq file paths\n";
897 for (
unsigned iglob = 0; iglob < g.gl_pathc; ++iglob) {
898 char* thePath = g.gl_pathv[iglob];
900 static std::atomic<int> nOpenErrors{0};
901 const int maxOpenErrors = 50;
902 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
903 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
905 if (nOpenErrors == maxOpenErrors) {
906 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
907 <<
"th occurence of this error. Report of this error is now disabled.\n";
915 std::cout <<
"Get orbit range " << (goodRange ?
"succeeded" :
"failed") <<
". Range: " << firstOrb <<
"..."
917 if (goodRange && lastOrb > maxOrb)
919 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
924 std::cout <<
"[Matacq " <<
now() <<
"] Switching to file " << fname <<
"\n";
933 LogInfo(
"Matacq") <<
"Uses matacq data file: '" << fname <<
"'\n";
937 <<
"] no matacq file found "
939 << runNumber <<
", orbit " << orbitId <<
"\n";
942 if (fileChange !=
nullptr)
951 if (fileChange !=
nullptr)
969 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
970 "required to retrieve the orbit ID";
974 for (
int id = 601;
id <= 654; ++
id) {
978 const int orbitIdOffset64 = 3;
979 if (data.
size() >= 8 * (orbitIdOffset64 + 1)) {
980 const unsigned char* pOrbit = data.
data() + orbitIdOffset64 * 8;
981 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
985 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
997 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " << ev.
id();
1006 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are "
1007 "required to retrieve the trigger type";
1011 for (
int id = 601;
id <= 654; ++
id) {
1015 const int detailedTrigger32 = 5;
1016 if (data.
size() >= 4 * (detailedTrigger32 + 1)) {
1017 const unsigned char* pTType = data.
data() + detailedTrigger32 * 4;
1018 int tType = pTType[1] & 0x7;
1023 int tType = stat.
result(&p);
1026 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
1031 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
1040 const size_t headerSize = 8 * 8;
1041 unsigned char data[headerSize];
1042 if (!mp->
mread((
char*)data, headerSize)) {
1044 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
1059 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
1068 cout <<
"[Matacq " <<
now() <<
"] File is missing!" << endl;
1071 }
else if (s == 0) {
1073 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
1082 cout <<
"[Matacq " <<
now() <<
"] File size: " << s <<
" Number of events: " << nEvents << endl;
1088 "Moving to beginning of last complete "
1090 if (!mp->
mread((
char*)data, headerSize,
"Reading matacq header",
true)) {
1091 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. "
1092 "Falling back to safe retrieval mode.";
1100 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
1106 <<
"Fast matacq event retrieval failure: it looks like "
1107 "the matacq file contains events of different sizes.";
1123 if (orb < firstOrbit_)
1125 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
1127 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " << r << endl;
1134 gettimeofday(&t,
nullptr);
1138 <<
"] Time elapsed between first event and "
1139 "destruction of MatacqProducer: "
1150 cout <<
"[Matacq " <<
now() <<
"] "
1151 <<
"Offset to substract to Matacq events Orbit ID: \n"
1152 <<
"#Run Number\t Offset\n";
1174 cout << run <<
"\t" << orbit <<
"\n";
1179 #ifdef USE_STORAGE_MANAGER
1184 Storage::Relative wh;
1185 if (whence == SEEK_SET)
1187 else if (whence == SEEK_CUR)
1188 wh = Storage::CURRENT;
1189 else if (whence == SEEK_END)
1192 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
1194 inFile_->position(offset, wh);
1197 cout <<
"[Matacq " <<
now() <<
"] ";
1199 cout << mess <<
". ";
1200 cout <<
"Random access error on input matacq file. ";
1201 if (whence == SEEK_SET)
1202 cout <<
"Failed to seek absolute position " <<
offset;
1203 else if (whence == SEEK_CUR)
1204 cout <<
"Failed to move " << offset <<
" bytes forward";
1205 else if (whence == SEEK_END)
1206 cout <<
"Failed to seek position at " << offset <<
" bytes before end of file";
1207 cout <<
". Reopening file. " << e.
what() <<
"\n";
1232 rc = (n ==
inFile_->xread(buf, n));
1235 cout <<
"[Matacq " <<
now() <<
"] ";
1237 cout << mess <<
". ";
1238 cout <<
"Read failure from input matacq file: " << e.
what() <<
"\n";
1266 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " << e.
what() <<
". "
1267 <<
"File will be reopened.";
1306 #else //USE_STORAGE_MANAGER not defined
1310 const int rc = fseeko(
inFile_, offset, whence);
1312 cout <<
"[Matacq " <<
now() <<
"] ";
1314 cout << mess <<
". ";
1315 cout <<
"Random access error on input matacq file. "
1333 bool rc = (pos != -1) && (1 == fread(buf, n, 1,
inFile_));
1336 cout <<
"[Matacq " <<
now() <<
"] ";
1338 cout << mess <<
". ";
1339 cout <<
"Read failure from input matacq file.\n";
1344 if (0 != fseeko(
inFile_, pos, SEEK_SET)) {
1346 cout <<
"[Matacq " <<
now() <<
"] ";
1348 cout << mess <<
". ";
1349 cout <<
"Failed to restore file position of "
1350 "before read error. Rewind file.\n";
1364 if (0 != fstat(fileno(
inFile_), &buf)) {
1377 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1382 return 0 ==
stat(name.c_str(), &dummy);
1396 inFile_ = fopen(name.c_str(),
"r");
1420 #endif //USE_STORAGE_MANAGER defined
1423 int millions = runNumber / (1000 * 1000);
1424 int thousands = (runNumber - millions * 1000 * 1000) / 1000;
1425 int units = runNumber - millions * 1000 * 1000 - thousands * 1000;
1426 return fmt::sprintf(
"%03d/%03d/%03d", millions, thousands, units);
1448 const unsigned headerSize = 8 * 8;
1449 unsigned char header[headerSize];
1452 if (!
mread((
char*)header, headerSize,
nullptr,
false))
1458 unsigned nEvts = fsize / (len * 8);
1468 mread((
char*)header, headerSize,
nullptr,
false);
static const char runNumber_[]
~MatacqProducer() override
static const int matacqFedId_
static const int orbitTolerance_
uint16_t *__restrict__ id
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=nullptr)
unsigned getRunNum() const
bool getByToken(EDGetToken token, Handle< PROD > &result) const
#define DEFINE_FWK_MODULE(type)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=nullptr)
bool operator<(const MatacqEventId &a)
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
std::vector< std::string > fileNames_
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
size_t size() const
Lenght of the data buffer in bytes.
uint32_t getRunNumber(edm::Event &ev) const
uint32_t getOrbitId() const
unsigned getDccLen() const
uint32_t getOrbitId(edm::Event &ev) const
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
bool mread(char *buf, size_t n, const char *mess=nullptr, bool peek=false)
int64_t pos(int orb) const
std::map< uint32_t, uint32_t > orbitOffset_
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
double nNonLaserEventsWithMatacq
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
char const * what() const noexceptoverride
Abs< T >::type abs(const T &t)
edm::InputTag inputRawCollection_
double nLaserEventsWithMatacq
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
int getCalibTriggerType(edm::Event &ev) const
Log< level::Info, false > LogInfo
bool operator>(const MatacqEventId &a)
unsigned long long uint64_t
MatacqDataFormatter formatter_
NullOut & operator<<(std::ostream &(*pf)(std::ostream &))
T getParameter(std::string const &) const
bool operator==(const MatacqEventId &a)
uint32_t openedFileRunNumber_
MatacqEventId(uint32_t r, uint32_t o)
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
static const int bufferSize
MatacqProducer(const edm::ParameterSet ¶ms)
TString units(TString variable, Char_t axis)
char data[epos_bytes_allocation]
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
std::string rawInstanceName_
Log< level::Warning, false > LogWarning
T result(double *proba) const
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
static const stats_t stats_init
NullOut & operator<<(const T &a)
bool mopen(const std::string &name)
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)