3 #ifdef USE_STORAGE_MANAGER 6 #else //USE_STORAGE_MANAGER not defined 7 #ifndef _LARGEFILE64_SOURCE 8 #define _LARGEFILE64_SOURCE 9 #endif //_LARGEFILE64_SOURCE not defined 10 #define _FILE_OFFSET_BITS 64 12 #endif //USE_STORAGE_MANAGER defined 42 #ifdef USE_STORAGE_MANAGER 44 typedef std::unique_ptr<Storage>
FILE_t;
62 return (this->run <
a.run) || ((this->run ==
a.run) && (this->orbit <
a.orbit));
66 return (this->run >
a.run) || ((this->run ==
a.run) && (this->orbit >
a.orbit));
86 int64_t
pos(
int orb)
const;
163 bool mread(
char*
buf,
size_t n,
const char* mess =
nullptr,
bool peek =
false);
321 #include <sys/stat.h> 322 #include <sys/types.h> 325 #include <fmt/printf.h> 326 #include <boost/algorithm/string.hpp> 340 using namespace boost;
360 gettimeofday(&
t,
nullptr);
363 strftime(
buf,
sizeof(
buf),
"%F %R %S s", localtime(&
t.tv_sec));
367 buf2 <<
buf <<
" " << ((
t.tv_usec + 500) / 1000) <<
" ms";
374 digiInstanceName_(
params.getParameter<
string>(
"digiInstanceName")),
375 rawInstanceName_(
params.getParameter<
string>(
"rawInstanceName")),
377 disabled_(
params.getParameter<
bool>(
"disabled")),
378 verbosity_(
params.getUntrackedParameter<
int>(
"verbosity", 0)),
379 produceDigis_(
params.getParameter<
bool>(
"produceDigis")),
380 produceRaw_(
params.getParameter<
bool>(
"produceRaw")),
382 mergeRaw_(
params.getParameter<
bool>(
"mergeRaw")),
383 ignoreTriggerType_(
params.getParameter<
bool>(
"ignoreTriggerType")),
387 openedFileRunNumber_(0),
389 fastRetrievalThresh_(0),
390 orbitOffsetFile_(
params.getUntrackedParameter<
std::
string>(
"orbitOffsetFile",
"")),
393 logFileName_(
params.getUntrackedParameter<
std::
string>(
"logFileName",
"matacqProducer.log")),
394 eventSkipCounter_(0),
395 onErrorDisablingEvtCnt_(
params.getParameter<
int>(
"onErrorDisablingEvtCnt")),
396 timeLogFile_(
params.getUntrackedParameter<
std::
string>(
"timeLogFile",
"")),
399 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer ctor" << endl;
401 gettimeofday(&
timer_,
nullptr);
406 cout <<
"[LaserSorter " <<
now() <<
"] " 407 <<
"Failed to open file " <<
timeLogFile_ <<
" to log timing.\n";
427 <<
"] registering new " 428 "EcalMatacqDigiCollection product with instance name '" 436 <<
"] registering new FEDRawDataCollection " 437 "product with instance name '" 450 cout <<
"[Matacq " <<
now() <<
"] exiting MatacqProducer ctor" << endl;
455 cout <<
"[Matacq " <<
now() <<
"] in MatacqProducer::produce" << endl;
458 gettimeofday(&
t,
nullptr);
460 timeLog_ <<
t.tv_sec <<
"." << setfill(
'0') << setw(3) << (
t.tv_usec + 500) / 1000 << setfill(
' ') <<
"\t" 461 << (
t.tv_usec -
timer_.tv_usec) * 1. + (
t.tv_sec -
timer_.tv_sec) * 1.e6 <<
"\t";
478 gettimeofday(&
t,
nullptr);
488 std::unique_ptr<FEDRawDataCollection> rawColl;
491 rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
493 rawColl = std::make_unique<FEDRawDataCollection>();
497 auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
519 <<
". No orbit correction will be applied.";
525 LogInfo(
"Matacq") <<
"Matacq data file found for " 526 <<
"run " <<
runNumber <<
" orbit " << orbitId;
534 LogWarning(
"Matacq") <<
" Error in Matacq event fragment length! " 537 <<
"Matacq data will not be included for this event.\n";
544 <<
" to dcc event with orbit id " << orbitId << std::endl;
552 LogWarning(
"Matacq") <<
"No matacq data found for laser event " 553 <<
"of run " <<
runNumber <<
" orbit " << orbitId;
557 LogWarning(
"Matacq") <<
"No matacq file found for event " <<
event.id();
565 <<
" next events will be skipped, following to an " 566 <<
"error on the last processed event, " 567 <<
"which is expected to be persistant.";
575 cout <<
"[Matacq " <<
now() <<
"] " 576 <<
"Adding FEDRawDataCollection collection " 583 cout <<
"[Matacq " <<
now() <<
"] " 584 <<
"Adding EcalMatacqDigiCollection collection " 638 if (!
mtell(startPos))
641 int32_t startOrb = -1;
642 const size_t headerSize = 8 * 8;
643 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
650 <<
"] Failed to read matacq header. Moved to start of " 654 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
659 cout <<
"[Matacq " <<
now() <<
"] Looks like matacq file is empty" 666 cout <<
"[Matacq " <<
now() <<
"] Last read orbit: " <<
lastOrb_ <<
" looking for orbit " << orbitId
667 <<
". Current file position: " << startPos <<
" Orbit at current position: " << startOrb <<
"\n";
670 bool didCoarseMove =
false;
685 pos = ((int64_t)fsize / evtSize - 1) * evtSize;
688 <<
"] Estimated position was beyond end of file. " 698 cout <<
"[Matacq " <<
now() <<
"] jumping to estimated position " <<
pos <<
"\n";
699 mseek(
pos, SEEK_SET,
"Jumping to estimated event position");
700 if (
mread((
char*)&
data_[0], headerSize,
"Reading matacq header",
true)) {
701 didCoarseMove =
true;
705 didCoarseMove =
false;
706 if (!
mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
713 <<
"] Event orbit outside of orbit range " 714 "of matacq data file events\n";
732 if (startOrb > 0 && (
abs(orb - orbitId) >
abs(startOrb - orbitId))) {
734 cout <<
"[Matacq " <<
now() <<
"] Estimation (-> orbit " << orb
736 "was worst than original position (-> orbit " 737 << startOrb <<
"). Restoring position (" << startPos <<
").\n";
738 mseek(startPos, SEEK_SET);
739 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true);
744 bool searchBackward = (orb > orbitId) ?
true :
false;
750 cout <<
"[Matacq " <<
now() <<
"] read DCC length is null! Cancels matacq event search " 751 <<
" and move matacq file pointer to beginning of the file. " 752 <<
"(" << __FILE__ <<
":" << __LINE__ <<
")." 761 while (
state == searching) {
768 cout <<
"[Matacq " <<
now() <<
"] Header read at file position " <<
pos <<
": orbit = " << orb
769 <<
" len = " << len <<
"x8 Byte" 770 <<
" run = " <<
run <<
"\n";
776 if ((
int)
data_.size() < len * 8) {
781 <<
"] Event found. Reading " 784 if (!
mread((
char*)&
data_[0], len * 8,
"Reading matacq event")) {
786 cout <<
"[Matacq " <<
now() <<
"] Failed to read matacq event." 792 if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {
796 cout <<
"[Matacq " <<
now() <<
"] No matacq data found for run " <<
run <<
", orbit ID " << orbitId <<
"." 799 off_t
offset = (searchBackward ? -len : len) * 8;
802 cout <<
"[Matacq " <<
now() <<
"] In matacq file, moving " <<
abs(
offset) <<
" byte " 803 << (
offset > 0 ?
"forward" :
"backward") <<
".\n";
806 if (
mseek(
offset, SEEK_CUR, (searchBackward ?
"Moving to previous event" :
"Moving to next event")) &&
807 mread((
char*)&
data_[0], headerSize,
"Reading event header",
true)) {
810 mseek(-len * 8, SEEK_CUR,
"Moving to start of last complete event");
822 if (
pos == fsize - 1) {
825 <<
"] Event found was at the end of the file. Moving " 826 "stream position to beginning of this event." 829 mseek(-(
int)len * 8 - 1, SEEK_CUR,
"Moving to beginning of last matacq event");
837 uint32_t firstOrb, lastOrb;
840 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
841 if (fileChange !=
nullptr)
850 const string runNumberFormat =
"%08d{,_*}";
851 string sRunNumber = fmt::sprintf(runNumberFormat,
runNumber);
860 for (
int itry = 0; itry < 1 && (orbitId > maxOrb); ++itry) {
863 std::cout <<
"[Matacq " <<
now() <<
"] Event orbit id (" << orbitId
865 "beyound the range of available one. Waiting for " 867 <<
" seconds in case " 868 "it was not written yet to disk.";
875 boost::algorithm::replace_all(
fname,
"%run_number%", sRunNumber);
878 int rc = glob(
fname.c_str(), GLOB_BRACE,
nullptr, &
g);
884 <<
"] Running out of memory while calling glob function to look for matacq file paths\n";
888 <<
"] Read error while calling glob function to look for matacq file paths\n";
897 for (
unsigned iglob = 0; iglob <
g.gl_pathc; ++iglob) {
898 char* thePath =
g.gl_pathv[iglob];
900 static std::atomic<int> nOpenErrors{0};
901 const int maxOpenErrors = 50;
902 if (!
mopen(thePath) && nOpenErrors < maxOpenErrors) {
903 std::cout <<
"[Matacq " <<
now() <<
"] Failed to open file " << thePath;
905 if (nOpenErrors == maxOpenErrors) {
906 std::cout << nOpenErrors <<
"This is the " << maxOpenErrors
907 <<
"th occurence of this error. Report of this error is now disabled.\n";
915 std::cout <<
"Get orbit range " << (goodRange ?
"succeeded" :
"failed") <<
". Range: " << firstOrb <<
"..." 917 if (goodRange && lastOrb > maxOrb)
919 if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
933 LogInfo(
"Matacq") <<
"Uses matacq data file: '" <<
fname <<
"'\n";
937 <<
"] no matacq file found " 939 <<
runNumber <<
", orbit " << orbitId <<
"\n";
942 if (fileChange !=
nullptr)
951 if (fileChange !=
nullptr)
969 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are " 970 "required to retrieve the orbit ID";
974 for (
int id = 601;
id <= 654; ++
id) {
978 const int orbitIdOffset64 = 3;
979 if (
data.size() >= 8 * (orbitIdOffset64 + 1)) {
980 const unsigned char* pOrbit =
data.data() + orbitIdOffset64 * 8;
981 int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
985 LogWarning(
"EventCorruption") <<
"Orbit ID inconsitency in DCC headers";
997 LogWarning(
"NotFound") <<
"Failed to retrieve orbit ID of event " <<
ev.id();
1006 throw cms::Exception(
"NotFound") <<
"No FED raw data collection found. ECAL raw data are " 1007 "required to retrieve the trigger type";
1011 for (
int id = 601;
id <= 654; ++
id) {
1015 const int detailedTrigger32 = 5;
1016 if (
data.size() >= 4 * (detailedTrigger32 + 1)) {
1017 const unsigned char* pTType =
data.data() + detailedTrigger32 * 4;
1018 int tType = pTType[1] & 0x7;
1023 int tType =
stat.result(&
p);
1026 LogWarning(
"NotFound") <<
"No ECAL DCC data found\n";
1031 LogWarning(
"EventCorruption") <<
"Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
1040 const size_t headerSize = 8 * 8;
1041 unsigned char data[headerSize];
1042 if (!mp->
mread((
char*)
data, headerSize)) {
1044 cout <<
"[Matacq " <<
now() <<
"] reached end of file!\n";
1059 cout <<
"[Matacq " <<
now() <<
"] event length is null!" << endl;
1068 cout <<
"[Matacq " <<
now() <<
"] File is missing!" << endl;
1071 }
else if (
s == 0) {
1073 cout <<
"[Matacq " <<
now() <<
"] File is empty!" << endl;
1082 cout <<
"[Matacq " <<
now() <<
"] File size: " <<
s <<
" Number of events: " <<
nEvents << endl;
1088 "Moving to beginning of last complete " 1090 if (!mp->
mread((
char*)
data, headerSize,
"Reading matacq header",
true)) {
1091 LogWarning(
"Matacq") <<
"Fast matacq event retrieval failure. " 1092 "Falling back to safe retrieval mode.";
1100 cout <<
"[Matacq " <<
now() <<
"] Last event orbit: " << lastOrb <<
" last event length: " << lastLen << endl;
1106 <<
"Fast matacq event retrieval failure: it looks like " 1107 "the matacq file contains events of different sizes.";
1123 if (orb < firstOrbit_)
1125 uint64_t r = orbitStepMean_ != 0 ? (((
uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
1127 cout <<
"[Matacq " <<
now() <<
"] Estimated Position for orbit " << orb <<
": " <<
r << endl;
1134 gettimeofday(&
t,
nullptr);
1138 <<
"] Time elapsed between first event and " 1139 "destruction of MatacqProducer: " 1150 cout <<
"[Matacq " <<
now() <<
"] " 1151 <<
"Offset to substract to Matacq events Orbit ID: \n" 1152 <<
"#Run Number\t Offset\n";
1172 cout <<
run <<
"\t" << orbit <<
"\n";
1177 #ifdef USE_STORAGE_MANAGER 1182 Storage::Relative wh;
1183 if (whence == SEEK_SET)
1185 else if (whence == SEEK_CUR)
1186 wh = Storage::CURRENT;
1187 else if (whence == SEEK_END)
1190 throw cms::Exception(
"Bug") <<
"Bug found in " << __FILE__ <<
": " << __LINE__ <<
"\n";
1195 cout <<
"[Matacq " <<
now() <<
"] ";
1197 cout << mess <<
". ";
1198 cout <<
"Random access error on input matacq file. ";
1199 if (whence == SEEK_SET)
1200 cout <<
"Failed to seek absolute position " <<
offset;
1201 else if (whence == SEEK_CUR)
1202 cout <<
"Failed to move " <<
offset <<
" bytes forward";
1203 else if (whence == SEEK_END)
1204 cout <<
"Failed to seek position at " <<
offset <<
" bytes before end of file";
1205 cout <<
". Reopening file. " <<
e.what() <<
"\n";
1233 cout <<
"[Matacq " <<
now() <<
"] ";
1235 cout << mess <<
". ";
1236 cout <<
"Read failure from input matacq file: " <<
e.what() <<
"\n";
1264 cout <<
"Exception cautgh while rewinding file " <<
inFileName_ <<
": " <<
e.what() <<
". " 1265 <<
"File will be reopened.";
1304 #else //USE_STORAGE_MANAGER not defined 1310 cout <<
"[Matacq " <<
now() <<
"] ";
1312 cout << mess <<
". ";
1313 cout <<
"Random access error on input matacq file. " 1334 cout <<
"[Matacq " <<
now() <<
"] ";
1336 cout << mess <<
". ";
1337 cout <<
"Read failure from input matacq file.\n";
1344 cout <<
"[Matacq " <<
now() <<
"] ";
1346 cout << mess <<
". ";
1347 cout <<
"Failed to restore file position of " 1348 "before read error. Rewind file.\n";
1375 return fseeko(
inFile_, 0, SEEK_SET) != 0;
1418 #endif //USE_STORAGE_MANAGER defined 1421 int millions =
runNumber / (1000 * 1000);
1422 int thousands = (
runNumber - millions * 1000 * 1000) / 1000;
1423 int units =
runNumber - millions * 1000 * 1000 - thousands * 1000;
1424 return fmt::sprintf(
"%03d/%03d/%03d", millions, thousands,
units);
1446 const unsigned headerSize = 8 * 8;
1447 unsigned char header[headerSize];
1450 if (!
mread((
char*)
header, headerSize,
nullptr,
false))
1456 unsigned nEvts = fsize / (len * 8);
static const char runNumber_[]
int getCalibTriggerType(edm::Event &ev) const
~MatacqProducer() override
static const int matacqFedId_
static const int orbitTolerance_
bool mseek(filepos_t offset, int whence=SEEK_SET, const char *mess=nullptr)
bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool *fileChange=nullptr)
uint32_t getRunNumber(edm::Event &ev) const
bool operator<(const MatacqEventId &a)
bool getOrbitRange(uint32_t &firstOrb, uint32_t &lastOrb)
static unsigned getOrbitId(unsigned char *data, size_t size)
unsigned getDccLen() const
std::vector< std::string > fileNames_
size_t size() const
Lenght of the data buffer in bytes.
std::string digiInstanceName_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
int64_t pos(int orb) const
bool mcheck(const std::string &name)
struct MatacqProducer::stats_t stats_
bool mread(char *buf, size_t n, const char *mess=nullptr, bool peek=false)
std::map< uint32_t, uint32_t > orbitOffset_
edm::EDGetTokenT< FEDRawDataCollection > inputRawCollectionToken_
void addMatacqData(edm::Event &event)
double nNonLaserEventsWithMatacq
static std::string runSubDir(uint32_t runNumber)
int onErrorDisablingEvtCnt_
Abs< T >::type abs(const T &t)
#define DEFINE_FWK_MODULE(type)
edm::InputTag inputRawCollection_
double nLaserEventsWithMatacq
std::string orbitOffsetFile_
void produce(edm::Event &event, const edm::EventSetup &eventSetup) override
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
Log< level::Info, false > LogInfo
bool operator>(const MatacqEventId &a)
unsigned long long uint64_t
MatacqDataFormatter formatter_
NullOut & operator<<(std::ostream &(*pf)(std::ostream &))
bool operator==(const MatacqEventId &a)
uint32_t getOrbitId(edm::Event &ev) const
uint32_t openedFileRunNumber_
MatacqEventId(uint32_t r, uint32_t o)
void init(MatacqProducer *mp)
bool mtell(filepos_t &pos)
static const int bufferSize
MatacqProducer(const edm::ParameterSet ¶ms)
TString units(TString variable, Char_t axis)
char data[epos_bytes_allocation]
std::string rawInstanceName_
unsigned getRunNum() const
uint32_t getOrbitId() const
Log< level::Warning, false > LogWarning
std::vector< unsigned char > data_
void newRun(int prevRun, int newRun)
static const stats_t stats_init
NullOut & operator<<(const T &a)
bool mopen(const std::string &name)
bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange)