16 #include <boost/algorithm/string.hpp>
49 #include <boost/lexical_cast.hpp>
55 defPath_(
pset.getUntrackedParameter<
std::
string>(
"buDefPath",
"")),
56 eventChunkSize_(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkSize", 32) * 1048576),
57 eventChunkBlock_(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkBlock", 32) * 1048576),
58 numBuffers_(
pset.getUntrackedParameter<unsigned
int>(
"numBuffers", 2)),
59 maxBufferedFiles_(
pset.getUntrackedParameter<unsigned
int>(
"maxBufferedFiles", 2)),
60 getLSFromFilename_(
pset.getUntrackedParameter<
bool>(
"getLSFromFilename",
true)),
61 alwaysStartFromFirstLS_(
pset.getUntrackedParameter<
bool>(
"alwaysStartFromFirstLS",
false)),
62 verifyChecksum_(
pset.getUntrackedParameter<
bool>(
"verifyChecksum",
true)),
63 useL1EventID_(
pset.getUntrackedParameter<
bool>(
"useL1EventID",
false)),
67 fileListMode_(
pset.getUntrackedParameter<
bool>(
"fileListMode",
false)),
68 fileListLoopMode_(
pset.getUntrackedParameter<
bool>(
"fileListLoopMode",
false)),
73 currentLumiSection_(0),
74 tcds_pointer_(nullptr),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " << std::endl
83 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
84 <<
"Invalid TCDS Test FED range parameter";
90 long autoRunNumber = -1;
94 if (autoRunNumber < 0)
95 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
114 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource")
115 <<
"no reading enabled with numBuffers parameter 0";
122 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
129 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
134 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
140 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
144 edm::LogInfo(
"FedRawDataInputSource") <<
"EvFDaqDirector/Source configured to use file service";
165 cvReader_.push_back(
new std::condition_variable);
167 threadInit_.store(
false, std::memory_order_release);
187 std::unique_lock<std::mutex> lk(
mReader_);
208 desc.setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
209 desc.addUntracked<
unsigned int>(
"eventChunkSize", 32)->setComment(
"Input buffer (chunk) size");
210 desc.addUntracked<
unsigned int>(
"eventChunkBlock", 32)
211 ->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
212 desc.addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
213 desc.addUntracked<
unsigned int>(
"maxBufferedFiles", 2)
214 ->setComment(
"Maximum number of simultaneously buffered raw files");
215 desc.addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
216 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
217 desc.addUntracked<
bool>(
"verifyChecksum",
true)
218 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
219 desc.addUntracked<
bool>(
"useL1EventID",
false)
220 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
221 desc.addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
222 ->setComment(
"[min, max] range to search for TCDS FED ID in test setup");
223 desc.addUntracked<
bool>(
"fileListMode",
false)
224 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
225 desc.addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
226 ->setComment(
"file list used when fileListMode is enabled");
227 desc.setAllowAnything();
228 descriptions.
add(
"source",
desc);
256 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
267 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
273 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
316 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
330 gettimeofday(&tv,
nullptr);
331 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
339 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
360 std::unique_lock<std::mutex> lkw(
mWakeup_);
371 <<
"Run has been aborted by the input source reader thread";
416 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
420 std::unique_lock<std::mutex> lkw(
mWakeup_);
440 <<
"Premature end of input file while reading file header";
443 <<
"File with only raw header and no events received in LS " <<
currentFile_->lumi_;
460 <<
"Premature end of input file while reading event header";
491 <<
"Premature end of input file while reading event header";
495 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
498 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
507 <<
"Premature end of input file while reading event data";
513 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
533 unsigned char* dataPosition;
538 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
541 <<
" event id:" <<
event_->event() <<
" lumi:" <<
event_->lumi() <<
" run:" <<
event_->run()
550 <<
"Premature end of input file while reading event data";
573 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
576 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
587 if (crc !=
event_->crc32c()) {
591 <<
"Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
" but calculated 0x"
595 uint32_t adler = adler32(0
L, Z_NULL, 0);
596 adler = adler32(adler, (Bytef*)
event_->payload(),
event_->eventSize());
598 if (adler !=
event_->adler32()) {
602 <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
" but calculated 0x"
627 <<
"No TCDS or GTP FED in event with FEDHeader EID -: " <<
L1EventID_;
641 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.
triggerType()),
668 bool fileIsBeingProcessed =
false;
671 fileIsBeingProcessed =
true;
675 if (!fileIsBeingProcessed) {
692 gettimeofday(&stv,
nullptr);
697 uint32_t eventSize =
event_->eventSize();
698 unsigned char*
event = (
unsigned char*)
event_->payload();
702 uint16_t selectedTCDSFed = 0;
703 while (eventSize > 0) {
713 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
715 if (!selectedTCDSFed) {
716 selectedTCDSFed =
fedId;
722 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection")
723 <<
"Second TCDS FED ID " <<
fedId <<
" found. First ID: " << selectedTCDSFed;
733 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
743 memcpy(fedData.
data(),
event + eventSize, fedSize);
754 unsigned int currentLumiSection = 0;
764 uint32_t lockCount = 0;
775 bool copy_active =
false;
793 std::unique_lock<std::mutex> lkw(
mWakeup_);
798 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
814 uint32_t fileSizeIndex;
815 int64_t fileSizeFromMetadata;
823 uint16_t rawHeaderSize = 0;
824 uint32_t lsFromRaw = 0;
825 int32_t serverEventsInNewFile = -1;
839 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, reading files is paused...";
860 serverEventsInNewFile,
861 fileSizeFromMetadata,
882 serverEventsInNewFile,
883 fileSizeFromMetadata,
894 if (thisLockWaitTimeUs > 0.)
895 sumLockWaitTimeUs += thisLockWaitTimeUs;
903 sumLockWaitTimeUs = 0;
933 if (
ls > currentLumiSection) {
937 currentLumiSection =
ls;
947 for (
unsigned int nextLS =
std::min(lsToStart,
ls); nextLS <=
ls; nextLS++) {
958 for (
unsigned int nextLS = currentLumiSection + 1; nextLS <=
ls; nextLS++) {
963 currentLumiSection =
ls;
967 if (currentLumiSection > 0 &&
ls < currentLumiSection) {
969 <<
"Got old LS (" <<
ls <<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection
970 <<
". Aborting execution." << std::endl;
985 if (!(dbgcount % 20))
986 LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
990 backoff_exp =
std::min(4, backoff_exp);
992 int sleeptime = (
int)(100000. *
pow(2, backoff_exp));
1002 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1010 rawFile = rawFilePath.replace_extension(
".raw").string();
1014 int stat_res =
stat(rawFile.c_str(), &st);
1015 if (stat_res == -1) {
1016 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
1027 int eventsInNewFile;
1030 eventsInNewFile = 0;
1032 eventsInNewFile = -1;
1036 if (rawHeaderSize) {
1037 int rawFdEmpty = -1;
1038 uint16_t rawHeaderCheck;
1041 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0,
true);
1042 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1047 eventsInNewFile = serverEventsInNewFile;
1048 assert(eventsInNewFile >= 0);
1049 assert((eventsInNewFile > 0) ==
1050 (fileSize > rawHeaderSize));
1059 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1070 auto newInputFilePtr = newInputFile.get();
1073 for (
unsigned int i = 0;
i < neededChunks;
i++) {
1075 bool copy_active =
false;
1085 unsigned int newTid = 0xffffffff;
1095 bool copy_active =
false;
1113 if (newChunk ==
nullptr) {
1115 if (newTid != 0xffffffff)
1124 std::unique_lock<std::mutex> lk(
mReader_);
1138 if (!eventsInNewFile) {
1144 std::unique_lock<std::mutex> lkw(
mWakeup_);
1146 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1153 (rawHeaderSize > 0),
1170 std::unique_lock<std::mutex> lkw(
mWakeup_);
1175 newChunk->
reset(0, toRead, 0);
1179 std::unique_ptr<InputFile> newInputFile(
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1189 newInputFile->chunks_[0] = newChunk;
1198 unsigned numFinishedThreads = 0;
1204 std::unique_lock<std::mutex> lk(
mReader_);
1207 numFinishedThreads++;
1217 threadInit_.exchange(
true, std::memory_order_acquire);
1221 std::unique_lock<std::mutex> lk(
mReader_);
1248 unsigned int bufferLeft = (chunk->
offset_ == 0 &&
file->rawFd_ != -1) ?
file->rawHeaderSize_ : 0;
1253 bool fileOpenedHere =
false;
1256 fileDescriptor =
file->rawFd_;
1257 if (fileDescriptor == -1) {
1258 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1259 fileOpenedHere =
true;
1260 file->rawFd_ = fileDescriptor;
1264 fileDescriptor =
file->rawFd_;
1266 if (fileDescriptor == -1) {
1267 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1268 fileOpenedHere =
true;
1271 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1272 fileOpenedHere =
true;
1276 if (fileDescriptor < 0) {
1277 edm::LogError(
"FedRawDataInputSource") <<
"readWorker failed to open file -: " <<
file->fileName_
1278 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1282 if (fileOpenedHere) {
1284 pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1287 <<
"readWorker failed to seek file -: " <<
file->fileName_ <<
" fd:" << fileDescriptor <<
" to offset "
1288 << chunk->
offset_ <<
" error: " << strerror(errno);
1294 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
file->fileName_
1295 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1297 unsigned int skipped = bufferLeft;
1307 edm::LogError(
"FedRawDataInputSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1308 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1318 <<
"readWorker failed to read file -: " <<
file->fileName_ <<
" fd:" << fileDescriptor <<
" last:" <<
last
1319 <<
" expectedChunkSize:" << chunk->
usedSize_
1321 <<
" block:" << (
i + 1) <<
"/" <<
readBlocks_ <<
" error: " << strerror(errno);
1332 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1333 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB"
1334 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1337 if (chunk->
offset_ + bufferLeft ==
file->fileSize_) {
1338 close(fileDescriptor);
1339 fileDescriptor = -1;
1344 close(fileDescriptor);
1359 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1386 if (currentLeft <
size) {
1434 LogDebug(
"FedRawDataInputSource:InputFile") <<
"Deleting input file -:" <<
fileName_;
1437 }
catch (
const std::filesystem::filesystem_error& ex) {
1439 <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() <<
". Trying again.";
1442 <<
" - deleteFile std::exception CAUGHT -: " << ex.what() <<
". Trying again.";
1450 uint32_t existingSize = 0;
1454 if (
file->rawFd_ == -1) {
1456 if (
file->rawHeaderSize_)
1463 existingSize +=
file->rawHeaderSize_;
1466 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl <<
file->fileName_;
1468 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer")
1469 <<
"failed to open file " << std::endl
1475 (
void*)(
file->chunks_[0]->buf_ + existingSize),
1478 existingSize +=
last;
1483 if (
file->chunkPosition_ == 0) {
1487 existingSize +=
last;
1492 memmove((
void*)
file->chunks_[0]->buf_,
file->chunks_[0]->buf_ +
file->chunkPosition_, existingSizeLeft);
1498 for (uint32_t
i = 0;
i < blockcount;
i++) {
1499 const ssize_t
last =
1502 existingSizeLeft +=
last;
1508 file->chunkPosition_ = 0;
1513 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl <<
file->fileName_;
1533 std::pair<bool, unsigned int>
ret(
true, itr->second);
1538 return std::pair<bool, unsigned int>(
false, 0);
1543 if (
a.rfind(
'/') != std::string::npos)
1544 a =
a.substr(
a.rfind(
'/'));
1545 if (
b.rfind(
'/') != std::string::npos)
1546 b =
b.substr(
b.rfind(
'/'));
1554 if (fileStem.find(
"file://") == 0)
1555 fileStem = fileStem.substr(7);
1556 else if (fileStem.find(
"file:") == 0)
1557 fileStem = fileStem.substr(5);
1558 auto end = fileStem.find(
'_');
1560 if (fileStem.find(
"run") == 0) {
1564 long rval = boost::lexical_cast<long>(runStr);
1565 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: " << rval;
1567 }
catch (boost::bad_lexical_cast
const&) {
1569 <<
"Unable to autodetect run number in fileListMode from file -: " <<
fileName;
1582 if (nextFile.find(
"file://") == 0)
1583 nextFile = nextFile.substr(7);
1584 else if (nextFile.find(
"file:") == 0)
1585 nextFile = nextFile.substr(5);
1588 if (fileStem.find(
"ls"))
1589 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1590 if (fileStem.find(
'_'))
1591 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1594 ls = boost::lexical_cast<unsigned int>(fileStem);
1609 return getFile(
ls, nextFile, fsize, lockWaitTime);