16 #include <boost/algorithm/string.hpp> 54 defPath_(
pset.getUntrackedParameter<
std::
string>(
"buDefPath",
"")),
55 eventChunkSize_(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkSize", 32) * 1048576),
56 eventChunkBlock_(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkBlock", 32) * 1048576),
57 numBuffers_(
pset.getUntrackedParameter<unsigned
int>(
"numBuffers", 2)),
58 maxBufferedFiles_(
pset.getUntrackedParameter<unsigned
int>(
"maxBufferedFiles", 2)),
59 getLSFromFilename_(
pset.getUntrackedParameter<
bool>(
"getLSFromFilename",
true)),
60 alwaysStartFromFirstLS_(
pset.getUntrackedParameter<
bool>(
"alwaysStartFromFirstLS",
false)),
61 verifyChecksum_(
pset.getUntrackedParameter<
bool>(
"verifyChecksum",
true)),
62 useL1EventID_(
pset.getUntrackedParameter<
bool>(
"useL1EventID",
false)),
66 fileListMode_(
pset.getUntrackedParameter<
bool>(
"fileListMode",
false)),
67 fileListLoopMode_(
pset.getUntrackedParameter<
bool>(
"fileListLoopMode",
false)),
72 currentLumiSection_(0),
73 tcds_pointer_(nullptr),
76 gethostname(thishost, 255);
77 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " << std::endl
82 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
83 <<
"Invalid TCDS Test FED range parameter";
89 long autoRunNumber = -1;
93 if (autoRunNumber < 0)
94 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
113 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource")
114 <<
"no reading enabled with numBuffers parameter 0";
121 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
128 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
133 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
139 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
143 edm::LogInfo(
"FedRawDataInputSource") <<
"EvFDaqDirector/Source configured to use file service";
163 cvReader_.push_back(std::make_unique<std::condition_variable>());
190 it->second->unsetDeleteFile();
205 std::unique_lock<std::mutex> lk(
mReader_);
217 desc.setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
218 desc.addUntracked<
unsigned int>(
"eventChunkSize", 32)->setComment(
"Input buffer (chunk) size");
219 desc.addUntracked<
unsigned int>(
"eventChunkBlock", 32)
220 ->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
221 desc.addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
222 desc.addUntracked<
unsigned int>(
"maxBufferedFiles", 2)
223 ->setComment(
"Maximum number of simultaneously buffered raw files");
224 desc.addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
225 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
226 desc.addUntracked<
bool>(
"verifyChecksum",
true)
227 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
228 desc.addUntracked<
bool>(
"useL1EventID",
false)
229 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
230 desc.addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
231 ->setComment(
"[min, max] range to search for TCDS FED ID in test setup");
232 desc.addUntracked<
bool>(
"fileListMode",
false)
233 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
234 desc.addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
235 ->setComment(
"file list used when fileListMode is enabled");
236 desc.setAllowAnything();
237 descriptions.
add(
"source",
desc);
264 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
275 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
281 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
324 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
338 gettimeofday(&tv,
nullptr);
339 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
347 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
370 std::unique_lock<std::mutex> lkw(
mWakeup_);
382 <<
"Run has been aborted by the input source reader thread";
427 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
431 std::unique_lock<std::mutex> lkw(
mWakeup_);
451 <<
"Premature end of input file while reading file header";
454 <<
"File with only raw header and no events received in LS " <<
currentFile_->lumi_;
471 <<
"Premature end of input file while reading event header";
505 <<
"Premature end of input file while reading event header";
509 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
512 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
521 <<
"Premature end of input file while reading event data";
527 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
550 unsigned char* dataPosition;
555 <<
"Premature end of input file (missing:" 557 <<
") while reading event data for next event header";
560 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
563 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
572 <<
"Premature end of input file (missing:" << (msgSize -
currentFile_->fileSizeLeft())
573 <<
") while reading event data for event " <<
event_->event() <<
" lumi:" <<
event_->lumi();
597 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
600 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
609 <<
" after reading last event declared size of " <<
event_->size() <<
" bytes";
617 if (crc !=
event_->crc32c()) {
621 <<
"Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
" but calculated 0x" 625 uint32_t adler = adler32(0
L, Z_NULL, 0);
626 adler = adler32(adler, (Bytef*)
event_->payload(),
event_->eventSize());
628 if (adler !=
event_->adler32()) {
632 <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
" but calculated 0x" 657 <<
"No TCDS or GTP FED in event with FEDHeader EID -: " <<
L1EventID_;
698 bool fileIsBeingProcessed =
false;
701 fileIsBeingProcessed =
true;
722 gettimeofday(&stv,
nullptr);
727 uint32_t eventSize =
event_->eventSize();
728 unsigned char*
event = (
unsigned char*)
event_->payload();
732 uint16_t selectedTCDSFed = 0;
733 while (eventSize > 0) {
743 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
745 if (!selectedTCDSFed) {
746 selectedTCDSFed =
fedId;
752 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
753 <<
"Second TCDS FED ID " <<
fedId <<
" found. First ID: " << selectedTCDSFed;
763 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
773 memcpy(fedData.
data(),
event + eventSize, fedSize);
784 unsigned int currentLumiSection = 0;
793 uint32_t lockCount = 0;
804 bool copy_active =
false;
822 std::unique_lock<std::mutex> lkw(
mWakeup_);
828 <<
"No free chunks or threads. Worker pool empty:" <<
workerPool_.empty()
832 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
848 uint32_t fileSizeIndex;
849 int64_t fileSizeFromMetadata;
857 uint16_t rawHeaderSize = 0;
858 uint32_t lsFromRaw = 0;
859 int32_t serverEventsInNewFile = -1;
873 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
874 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
875 bool hasDiscardedLumi =
false;
876 for (
unsigned int i = checkLumiStart;
i <= currentLumiSection;
i++) {
878 edm::LogWarning(
"FedRawDataInputSource") <<
"Source detected that the lumisection is discarded -: " <<
i;
879 hasDiscardedLumi =
true;
883 if (hasDiscardedLumi)
889 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, reading files is paused...";
906 uint16_t rawDataType;
912 serverEventsInNewFile,
913 fileSizeFromMetadata,
934 serverEventsInNewFile,
935 fileSizeFromMetadata,
946 if (thisLockWaitTimeUs > 0.)
947 sumLockWaitTimeUs += thisLockWaitTimeUs;
955 sumLockWaitTimeUs = 0;
985 if (
ls > currentLumiSection) {
989 currentLumiSection =
ls;
997 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
1009 }
else if (
ls < 100) {
1013 for (
unsigned int nextLS =
std::min(lsToStart,
ls); nextLS <=
ls; nextLS++) {
1024 for (
unsigned int nextLS = currentLumiSection + 1; nextLS <=
ls; nextLS++) {
1029 currentLumiSection =
ls;
1033 if (currentLumiSection > 0 &&
ls < currentLumiSection) {
1035 <<
"Got old LS (" <<
ls <<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1036 <<
". Aborting execution." << std::endl;
1051 if (!(dbgcount % 20))
1052 LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1056 backoff_exp =
std::min(4, backoff_exp);
1058 int sleeptime = (
int)(100000. *
pow(2, backoff_exp));
1068 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1076 rawFile = rawFilePath.replace_extension(
".raw").string();
1080 int stat_res =
stat(rawFile.c_str(), &st);
1081 if (stat_res == -1) {
1082 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
1093 int eventsInNewFile;
1096 eventsInNewFile = 0;
1098 eventsInNewFile = -1;
1102 if (rawHeaderSize) {
1103 int rawFdEmpty = -1;
1104 uint16_t rawHeaderCheck;
1107 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0,
true);
1108 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1113 eventsInNewFile = serverEventsInNewFile;
1114 assert(eventsInNewFile >= 0);
1115 assert((eventsInNewFile > 0) ==
1116 (fileSize > rawHeaderSize));
1125 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1136 auto newInputFilePtr = newInputFile.get();
1139 for (
unsigned int i = 0;
i < neededChunks;
i++) {
1141 bool copy_active =
false;
1151 unsigned int newTid = 0xffffffff;
1161 bool copy_active =
false;
1179 if (newChunk ==
nullptr) {
1181 if (newTid != 0xffffffff)
1190 std::unique_lock<std::mutex> lk(
mReader_);
1204 if (!eventsInNewFile) {
1210 std::unique_lock<std::mutex> lkw(
mWakeup_);
1212 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1219 (rawHeaderSize > 0),
1238 if (newChunk ==
nullptr) {
1245 std::unique_lock<std::mutex> lkw(
mWakeup_);
1250 newChunk->
reset(0, toRead, 0);
1254 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1264 newInputFile->chunks_[0] = newChunk;
1273 unsigned numFinishedThreads = 0;
1279 std::unique_lock<std::mutex> lk(
mReader_);
1282 numFinishedThreads++;
1295 std::unique_lock<std::mutex> lk(
mReader_);
1322 unsigned int bufferLeft = (chunk->
offset_ == 0 &&
file->rawFd_ != -1) ?
file->rawHeaderSize_ : 0;
1327 bool fileOpenedHere =
false;
1330 fileDescriptor =
file->rawFd_;
1331 if (fileDescriptor == -1) {
1332 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1333 fileOpenedHere =
true;
1334 file->rawFd_ = fileDescriptor;
1338 fileDescriptor =
file->rawFd_;
1340 if (fileDescriptor == -1) {
1341 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1342 fileOpenedHere =
true;
1345 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1346 fileOpenedHere =
true;
1350 if (fileDescriptor < 0) {
1351 edm::LogError(
"FedRawDataInputSource") <<
"readWorker failed to open file -: " <<
file->fileName_
1352 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1356 if (fileOpenedHere) {
1358 pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1361 <<
"readWorker failed to seek file -: " <<
file->fileName_ <<
" fd:" << fileDescriptor <<
" to offset " 1362 << chunk->
offset_ <<
" error: " << strerror(errno);
1368 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
file->fileName_
1369 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1371 unsigned int skipped = bufferLeft;
1378 (
void*)(chunk->
buf_ + bufferLeft),
1382 edm::LogError(
"FedRawDataInputSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1383 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1393 <<
"readWorker failed to read file -: " <<
file->fileName_ <<
" fd:" << fileDescriptor <<
" last:" <<
last 1394 <<
" expectedChunkSize:" << chunk->
usedSize_ 1396 <<
" block:" << (
i + 1) <<
"/" <<
readBlocks_ <<
" error: " << strerror(errno);
1407 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1408 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1409 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1412 if (chunk->
offset_ + bufferLeft ==
file->fileSize_) {
1413 close(fileDescriptor);
1414 fileDescriptor = -1;
1419 close(fileDescriptor);
1434 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1461 if (currentLeft <
size) {
1512 LogDebug(
"FedRawDataInputSource:InputFile") <<
"Deleting input file -:" <<
fileName;
1515 }
catch (
const std::filesystem::filesystem_error& ex) {
1517 <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() <<
". Trying again.";
1520 <<
" - deleteFile std::exception CAUGHT -: " << ex.what() <<
". Trying again.";
1530 uint32_t existingSize = 0;
1534 if (
file->rawFd_ == -1) {
1536 if (
file->rawHeaderSize_)
1543 existingSize +=
file->rawHeaderSize_;
1546 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl <<
file->fileName_;
1548 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer")
1549 <<
"failed to open file " << std::endl
1555 (
void*)(
file->chunks_[0]->buf_ + existingSize),
1558 existingSize +=
last;
1563 if (
file->chunkPosition_ == 0) {
1567 existingSize +=
last;
1572 memmove((
void*)
file->chunks_[0]->buf_,
file->chunks_[0]->buf_ +
file->chunkPosition_, existingSizeLeft);
1578 for (uint32_t
i = 0;
i < blockcount;
i++) {
1579 const ssize_t
last =
1582 existingSizeLeft +=
last;
1588 file->chunkPosition_ = 0;
1593 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl <<
file->fileName_;
1613 std::pair<bool, unsigned int>
ret(
true, itr->second);
1618 return std::pair<bool, unsigned int>(
false, 0);
1623 if (
a.rfind(
'/') != std::string::npos)
1624 a =
a.substr(
a.rfind(
'/'));
1625 if (
b.rfind(
'/') != std::string::npos)
1626 b =
b.substr(
b.rfind(
'/'));
1634 if (fileStem.find(
"file://") == 0)
1635 fileStem = fileStem.substr(7);
1636 else if (fileStem.find(
"file:") == 0)
1637 fileStem = fileStem.substr(5);
1638 auto end = fileStem.find(
'_');
1640 if (fileStem.find(
"run") == 0) {
1644 long rval = std::stol(runStr);
1645 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: " << rval;
1649 <<
"Unable to autodetect run number in fileListMode from file -: " <<
fileName;
1662 if (nextFile.find(
"file://") == 0)
1663 nextFile = nextFile.substr(7);
1664 else if (nextFile.find(
"file:") == 0)
1665 nextFile = nextFile.substr(5);
1668 if (fileStem.find(
"ls"))
1669 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1670 if (fileStem.find(
'_'))
1671 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1674 ls = std::stoul(fileStem);
1689 return getFile(
ls, nextFile, fsize, lockWaitTime);
static const char runNumber_[]
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
static const uint32_t length
unsigned int get(const unsigned char *, bool)
bool useFileBroker() const
ret
prodAgent to be discontinued
volatile std::atomic< bool > shutdown_flag
constexpr size_t FRDHeaderMaxVersion
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool lumisectionDiscarded(unsigned int ls)
bool isSingleStreamThread()
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::string getEoRFilePathOnFU() const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
static Timestamp beginOfTime()
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
ProductProvenance const & dummyProvenance() const
filePath
CUSTOMIZE FOR ML.
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
unsigned long long TimeValue_t
bool evm_board_sense(const unsigned char *p, size_t size)
void createProcessingNotificationMaybe() const
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
BranchDescription const & branchDescription() const
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
void resize(size_t newsize, size_t wordsize=8)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
void stoppedLookingForFile(unsigned int lumi)
unsigned int getLumisectionToStart() const
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
unsigned int gtpe_get(const unsigned char *)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
Log< level::Warning, false > LogWarning
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
bool exceptionDetected() const
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
Power< A, B >::type pow(const A &a, const B &b)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void setInState(FastMonState::InputState inputState)