16 #include <boost/algorithm/string.hpp>
49 using namespace evf::FastMonState;
52 : edm::RawInputSource(pset, desc),
53 defPath_(pset.getUntrackedParameter<std::
string>(
"buDefPath",
"")),
54 eventChunkSize_(pset.getUntrackedParameter<unsigned int>(
"eventChunkSize", 32) * 1048576),
55 eventChunkBlock_(pset.getUntrackedParameter<unsigned int>(
"eventChunkBlock", 32) * 1048576),
56 numBuffers_(pset.getUntrackedParameter<unsigned int>(
"numBuffers", 2)),
57 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>(
"maxBufferedFiles", 2)),
58 getLSFromFilename_(pset.getUntrackedParameter<bool>(
"getLSFromFilename",
true)),
59 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>(
"alwaysStartFromFirstLS",
false)),
60 verifyChecksum_(pset.getUntrackedParameter<bool>(
"verifyChecksum",
true)),
61 useL1EventID_(pset.getUntrackedParameter<bool>(
"useL1EventID",
false)),
63 pset.getUntrackedParameter<std::
vector<unsigned int>>(
"testTCDSFEDRange", std::
vector<unsigned int>())),
65 fileListMode_(pset.getUntrackedParameter<bool>(
"fileListMode",
false)),
66 fileListLoopMode_(pset.getUntrackedParameter<bool>(
"fileListLoopMode",
false)),
71 currentLumiSection_(0),
72 tcds_pointer_(nullptr),
75 gethostname(thishost, 255);
76 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " << std::endl
81 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
82 <<
"Invalid TCDS Test FED range parameter";
88 long autoRunNumber = -1;
92 if (autoRunNumber < 0)
93 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
112 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource")
113 <<
"no reading enabled with numBuffers parameter 0";
120 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
127 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
132 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
138 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
142 edm::LogInfo(
"FedRawDataInputSource") <<
"EvFDaqDirector/Source configured to use file service";
163 cvReader_.push_back(
new std::condition_variable);
165 threadInit_.store(
false, std::memory_order_release);
185 std::unique_lock<std::mutex> lk(
mReader_);
206 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
207 desc.
addUntracked<
unsigned int>(
"eventChunkSize", 32)->setComment(
"Input buffer (chunk) size");
209 ->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
210 desc.
addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
212 ->setComment(
"Maximum number of simultaneously buffered raw files");
213 desc.
addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
214 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
216 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
218 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
219 desc.
addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
220 ->
setComment(
"[min, max] range to search for TCDS FED ID in test setup");
222 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
223 desc.
addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
224 ->
setComment(
"file list used when fileListMode is enabled");
226 descriptions.
add(
"source", desc);
254 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
265 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
271 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
314 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
328 gettimeofday(&tv,
nullptr);
329 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
337 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
358 std::unique_lock<std::mutex> lkw(
mWakeup_);
369 <<
"Run has been aborted by the input source reader thread";
414 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
418 std::unique_lock<std::mutex> lkw(
mWakeup_);
438 <<
"Premature end of input file while reading file header";
441 <<
"File with only raw header and no events received in LS " <<
currentFile_->lumi_;
458 <<
"Premature end of input file while reading event header";
477 if (detectedFRDversion_ == 0) {
478 detectedFRDversion_ = *((uint16_t*)dataPosition);
482 assert(detectedFRDversion_ >= 1);
489 <<
"Premature end of input file while reading event header";
493 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
496 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
505 <<
"Premature end of input file while reading event data";
511 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
531 unsigned char* dataPosition;
536 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
539 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
548 <<
"Premature end of input file while reading event data";
571 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
574 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
585 if (crc !=
event_->crc32c()) {
589 <<
"Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
" but calculated 0x"
593 uint32_t adler = adler32(0
L, Z_NULL, 0);
594 adler = adler32(adler, (Bytef*)
event_->payload(),
event_->eventSize());
596 if (adler !=
event_->adler32()) {
600 <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
" but calculated 0x"
625 <<
"No TCDS or GTP FED in event with FEDHeader EID -: " <<
L1EventID_;
666 bool fileIsBeingProcessed =
false;
669 fileIsBeingProcessed =
true;
673 if (!fileIsBeingProcessed) {
690 gettimeofday(&stv,
nullptr);
692 time = (time << 32) + stv.tv_usec;
695 uint32_t eventSize =
event_->eventSize();
696 unsigned char*
event = (
unsigned char*)
event_->payload();
700 uint16_t selectedTCDSFed = 0;
701 while (eventSize > 0) {
711 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
713 if (!selectedTCDSFed) {
714 selectedTCDSFed =
fedId;
720 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
721 <<
"Second TCDS FED ID " << fedId <<
" found. First ID: " << selectedTCDSFed;
731 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
741 memcpy(fedData.
data(),
event + eventSize, fedSize);
752 unsigned int currentLumiSection = 0;
762 uint32_t lockCount = 0;
773 bool copy_active =
false;
791 std::unique_lock<std::mutex> lkw(
mWakeup_);
796 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
812 uint32_t fileSizeIndex;
813 int64_t fileSizeFromMetadata;
821 uint16_t rawHeaderSize = 0;
822 uint32_t lsFromRaw = 0;
823 int32_t serverEventsInNewFile = -1;
837 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, reading files is paused...";
852 status =
getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
858 serverEventsInNewFile,
859 fileSizeFromMetadata,
880 serverEventsInNewFile,
881 fileSizeFromMetadata,
892 if (thisLockWaitTimeUs > 0.)
893 sumLockWaitTimeUs += thisLockWaitTimeUs;
901 sumLockWaitTimeUs = 0;
931 if (ls > currentLumiSection) {
935 currentLumiSection =
ls;
943 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
955 }
else if (ls < 100) {
959 for (
unsigned int nextLS =
std::min(lsToStart, ls); nextLS <=
ls; nextLS++) {
970 for (
unsigned int nextLS = currentLumiSection + 1; nextLS <=
ls; nextLS++) {
975 currentLumiSection =
ls;
979 if (currentLumiSection > 0 && ls < currentLumiSection) {
981 <<
"Got old LS (" << ls <<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection
982 <<
". Aborting execution." << std::endl;
997 if (!(dbgcount % 20))
998 LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1002 backoff_exp =
std::min(4, backoff_exp);
1004 int sleeptime = (int)(100000. *
pow(2, backoff_exp));
1014 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1022 rawFile = rawFilePath.replace_extension(
".raw").string();
1026 int stat_res =
stat(rawFile.c_str(), &st);
1027 if (stat_res == -1) {
1028 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
1039 int eventsInNewFile;
1042 eventsInNewFile = 0;
1044 eventsInNewFile = -1;
1048 if (rawHeaderSize) {
1049 int rawFdEmpty = -1;
1050 uint16_t rawHeaderCheck;
1053 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0,
true);
1054 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1059 eventsInNewFile = serverEventsInNewFile;
1060 assert(eventsInNewFile >= 0);
1061 assert((eventsInNewFile > 0) ==
1062 (fileSize > rawHeaderSize));
1071 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1082 auto newInputFilePtr = newInputFile.get();
1085 for (
unsigned int i = 0;
i < neededChunks;
i++) {
1087 bool copy_active =
false;
1097 unsigned int newTid = 0xffffffff;
1107 bool copy_active =
false;
1125 if (newChunk ==
nullptr) {
1127 if (newTid != 0xffffffff)
1136 std::unique_lock<std::mutex> lk(
mReader_);
1139 if (
i == neededChunks - 1 && fileSize % eventChunkSize_)
1141 newChunk->
reset(
i * eventChunkSize_, toRead,
i);
1150 if (!eventsInNewFile) {
1156 std::unique_lock<std::mutex> lkw(
mWakeup_);
1158 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1165 (rawHeaderSize > 0),
1182 std::unique_lock<std::mutex> lkw(
mWakeup_);
1187 newChunk->
reset(0, toRead, 0);
1191 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1201 newInputFile->chunks_[0] = newChunk;
1210 unsigned numFinishedThreads = 0;
1216 std::unique_lock<std::mutex> lk(
mReader_);
1219 numFinishedThreads++;
1229 threadInit_.exchange(
true, std::memory_order_acquire);
1233 std::unique_lock<std::mutex> lk(
mReader_);
1265 bool fileOpenedHere =
false;
1268 fileDescriptor = file->
rawFd_;
1269 if (fileDescriptor == -1) {
1270 fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1271 fileOpenedHere =
true;
1272 file->
rawFd_ = fileDescriptor;
1276 fileDescriptor = file->
rawFd_;
1278 if (fileDescriptor == -1) {
1279 fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1280 fileOpenedHere =
true;
1283 fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1284 fileOpenedHere =
true;
1288 if (fileDescriptor < 0) {
1290 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1294 if (fileOpenedHere) {
1296 pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1299 <<
"readWorker failed to seek file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
" to offset "
1300 << chunk->
offset_ <<
" error: " << strerror(errno);
1306 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_
1307 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1309 unsigned int skipped = bufferLeft;
1320 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1330 <<
"readWorker failed to read file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
" last:" << last
1331 <<
" expectedChunkSize:" << chunk->
usedSize_
1333 <<
" block:" << (
i + 1) <<
"/" << readBlocks_ <<
" error: " << strerror(errno);
1344 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1345 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB"
1346 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1350 close(fileDescriptor);
1351 fileDescriptor = -1;
1356 close(fileDescriptor);
1371 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1398 if (currentLeft < size) {
1446 LogDebug(
"FedRawDataInputSource:InputFile") <<
"Deleting input file -:" <<
fileName_;
1449 }
catch (
const std::filesystem::filesystem_error& ex) {
1451 <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() <<
". Trying again.";
1454 <<
" - deleteFile std::exception CAUGHT -: " << ex.what() <<
". Trying again.";
1462 uint32_t existingSize = 0;
1466 if (file->
rawFd_ == -1) {
1478 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1480 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer")
1481 <<
"failed to open file " << std::endl
1487 (
void*)(file->
chunks_[0]->buf_ + existingSize),
1490 existingSize +=
last;
1499 existingSize +=
last;
1510 for (uint32_t
i = 0;
i < blockcount;
i++) {
1511 const ssize_t
last =
1514 existingSizeLeft +=
last;
1525 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1536 itr->second += events;
1545 std::pair<bool, unsigned int>
ret(
true, itr->second);
1550 return std::pair<bool, unsigned int>(
false, 0);
1555 if (a.rfind(
'/') != std::string::npos)
1556 a = a.substr(a.rfind(
'/'));
1557 if (
b.rfind(
'/') != std::string::npos)
1558 b =
b.substr(
b.rfind(
'/'));
1566 if (fileStem.find(
"file://") == 0)
1567 fileStem = fileStem.substr(7);
1568 else if (fileStem.find(
"file:") == 0)
1569 fileStem = fileStem.substr(5);
1570 auto end = fileStem.find(
'_');
1572 if (fileStem.find(
"run") == 0) {
1576 long rval = std::stol(runStr);
1577 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: " << rval;
1581 <<
"Unable to autodetect run number in fileListMode from file -: " <<
fileName;
1594 if (nextFile.find(
"file://") == 0)
1595 nextFile = nextFile.substr(7);
1596 else if (nextFile.find(
"file:") == 0)
1597 nextFile = nextFile.substr(5);
1600 if (fileStem.find(
"ls"))
1601 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1602 if (fileStem.find(
'_'))
1603 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1606 ls = std::stoul(fileStem);
1621 return getFile(ls, nextFile, fsize, lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
constexpr size_t FRDHeaderMaxVersion
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
static const uint32_t length
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
string filePath
DQM Environment
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Log< level::Error, false > LogError
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
bool useFileBroker() const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
if(conf_.getParameter< bool >("UseStripCablingDB"))
StreamID streamID() const
BranchDescription const & branchDescription() const
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
unsigned long long TimeValue_t
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned int getLumisectionToStart() const
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
unsigned int getStartLumisectionFromEnv() const
unsigned int gtpe_get(const unsigned char *)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
Log< level::Warning, false > LogWarning
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
Power< A, B >::type pow(const A &a, const B &b)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const
void setInState(FastMonState::InputState inputState)