34 dataModeConfig_(
pset.getUntrackedParameter<
std::
string>(
"dataMode")),
35 eventChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkSize")) << 20),
36 maxChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"maxChunkSize")) << 20),
37 eventChunkBlock_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkBlock")) << 20),
38 numBuffers_(
pset.getUntrackedParameter<unsigned
int>(
"numBuffers")),
39 maxBufferedFiles_(
pset.getUntrackedParameter<unsigned
int>(
"maxBufferedFiles")),
40 alwaysStartFromFirstLS_(
pset.getUntrackedParameter<
bool>(
"alwaysStartFromFirstLS",
false)),
41 verifyChecksum_(
pset.getUntrackedParameter<
bool>(
"verifyChecksum")),
42 useL1EventID_(
pset.getUntrackedParameter<
bool>(
"useL1EventID")),
43 testTCDSFEDRange_(
pset.getUntrackedParameter<
std::
vector<unsigned
int>>(
"testTCDSFEDRange")),
45 fileListMode_(
pset.getUntrackedParameter<
bool>(
"fileListMode")),
46 fileListLoopMode_(
pset.getUntrackedParameter<
bool>(
"fileListLoopMode",
false)),
49 currentLumiSection_(0),
51 rng_(
std::chrono::system_clock::
now().time_since_epoch().
count()) {
53 gethostname(thishost, 255);
58 throw cms::Exception(
"DAQSource::DAQSource") <<
"maxChunkSize must be equal or larger than eventChunkSize";
63 throw cms::Exception(
"DAQSource::DAQSource") <<
"eventChunkBlock must be equal or smaller than eventChunkSize";
65 edm::LogInfo(
"DAQSource") <<
"Construction. read-ahead chunk size -: " << std::endl
73 throw cms::Exception(
"DAQSource::DAQSource") <<
"Invalid TCDS Test FED range parameter";
89 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
90 dataMode_->setTesting(
pset.getUntrackedParameter<
bool>(
"testing",
false));
92 long autoRunNumber = -1;
96 if (autoRunNumber < 0)
97 throw cms::Exception(
"DAQSource::DAQSource") <<
"Run number not found from filename";
106 auto& daqProvenanceHelpers =
dataMode_->makeDaqProvenanceHelpers();
107 for (
const auto& daqProvenanceHelper : daqProvenanceHelpers)
120 throw cms::Exception(
"DAQSource::DAQSource") <<
"no reading enabled with numBuffers parameter 0";
127 edm::LogError(
"DAQSource::DAQSource") <<
"Intel crc32c checksum computation unavailable";
134 edm::LogInfo(
"DAQSource") <<
"No FastMonitoringService found in the configuration";
139 throw cms::Exception(
"DAQSource") <<
"FastMonitoringService not found";
147 edm::LogInfo(
"DAQSource") <<
"EvFDaqDirector/Source configured to use file service";
168 cvReader_.push_back(
new std::condition_variable);
170 threadInit_.store(
false, std::memory_order_release);
191 it->second->unsetDeleteFile();
206 std::unique_lock<std::mutex> lk(
mReader_);
227 desc.setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
228 desc.addUntracked<
std::string>(
"dataMode",
"FRD")->setComment(
"Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
229 desc.addUntracked<
unsigned int>(
"eventChunkSize", 64)->setComment(
"Input buffer (chunk) size");
230 desc.addUntracked<
unsigned int>(
"maxChunkSize", 0)
231 ->setComment(
"Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
232 desc.addUntracked<
unsigned int>(
"eventChunkBlock", 0)
234 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If " 235 "0 is specified, use chunk size.");
237 desc.addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
238 desc.addUntracked<
unsigned int>(
"maxBufferedFiles", 2)
239 ->setComment(
"Maximum number of simultaneously buffered raw files");
240 desc.addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
241 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
242 desc.addUntracked<
bool>(
"verifyChecksum",
true)
243 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
244 desc.addUntracked<
bool>(
"useL1EventID",
false)
245 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
246 desc.addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
247 ->setComment(
"[min, max] range to search for TCDS FED ID in test setup");
248 desc.addUntracked<
bool>(
"fileListMode",
false)
249 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
250 desc.addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
251 ->setComment(
"file list used when fileListMode is enabled");
252 desc.setAllowAnything();
253 descriptions.
add(
"source",
desc);
272 auto nextEvent = [
this]() {
273 auto getNextEvent = [
this]() {
290 switch (nextEvent()) {
299 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
305 edm::LogInfo(
"DAQSource") <<
"----------------RUN ENDED----------------";
336 gettimeofday(&tv,
nullptr);
337 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
345 edm::LogInfo(
"DAQSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
356 dataMode_->setDataBlockInitialized(
false);
383 std::unique_lock<std::mutex> lkw(
mWakeup_);
393 throw cms::Exception(
"DAQSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
433 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
453 throw cms::Exception(
"DAQSource::getNextEvent") <<
"Premature end of input file while reading file header";
470 throw cms::Exception(
"DAQSource::getNextEvent") <<
"Premature end of input file while reading event header";
485 unsigned char* dataPosition;
498 throw cms::Exception(
"DAQSource::getNextEvent") <<
"Premature end of input file while reading event data";
508 if (currentChunkSize -
currentFile_->chunkPosition_ < msgSize) {
527 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
556 bool fileIsBeingProcessed =
false;
559 fileIsBeingProcessed =
true;
592 uint32_t lockCount = 0;
595 bool requireHeader =
dataMode_->requireHeader();
604 bool copy_active =
false;
622 std::unique_lock<std::mutex> lkw(
mWakeup_);
627 LogDebug(
"DAQSource") <<
"No free chunks or threads...";
643 int64_t fileSizeFromMetadata;
649 bool fitToBuffer =
dataMode_->fitToBuffer();
652 uint16_t rawHeaderSize = 0;
653 uint32_t lsFromRaw = 0;
654 int32_t serverEventsInNewFile = -1;
668 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
670 bool hasDiscardedLumi =
false;
673 edm::LogWarning(
"DAQSource") <<
"Source detected that the lumisection is discarded -: " <<
i;
674 hasDiscardedLumi =
true;
678 if (hasDiscardedLumi)
683 edm::LogWarning(
"DAQSource") <<
"Input throttled detected, reading files is paused...";
700 uint16_t rawDataType;
706 serverEventsInNewFile,
707 fileSizeFromMetadata,
723 serverEventsInNewFile,
724 fileSizeFromMetadata,
736 if (thisLockWaitTimeUs > 0.)
737 sumLockWaitTimeUs += thisLockWaitTimeUs;
745 sumLockWaitTimeUs = 0;
766 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
777 }
else if (
ls < 100) {
781 for (
unsigned int nextLS =
std::min(lsToStart,
ls); nextLS <=
ls; nextLS++) {
800 <<
". Aborting execution." << std::endl;
813 if (!(dbgcount % 20))
814 LogDebug(
"DAQSource") <<
"No file for me... sleep and try again...";
816 backoff_exp =
std::min(4, backoff_exp);
818 int sleeptime = (
int)(100000. *
pow(2, backoff_exp));
827 LogDebug(
"DAQSource") <<
"The director says to grab -: " << nextFile;
834 int stat_res =
stat(rawFile.c_str(), &st);
835 if (stat_res == -1) {
836 edm::LogError(
"DAQSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
852 eventsInNewFile = -1;
854 eventsInNewFile = serverEventsInNewFile;
855 assert(eventsInNewFile >= 0);
856 assert((eventsInNewFile > 0) ==
857 (fileSize > rawHeaderSize));
860 std::pair<bool, std::vector<std::string>> additionalFiles =
862 if (!additionalFiles.first) {
869 std::unique_ptr<RawInputFile> newInputFile(
new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
881 for (
const auto&
addFile : additionalFiles.second) {
884 unsigned int fcnt = 0;
895 if ((fcnt && fcnt % 3000 == 0) ||
quit_threads_.load(std::memory_order_relaxed)) {
901 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
902 bool prematureEoR =
false;
903 if (
stat(secondaryEoR.c_str(), &bufEoR) == 0) {
906 <<
"EoR file appeared in -: " << secondaryPath <<
" while waiting for index file " <<
addFile;
909 }
else if (
stat(mainEoR.c_str(), &bufEoR) == 0) {
914 <<
"Main EoR file appeared -: " << mainEoR <<
" while waiting for index file " <<
addFile;
937 newInputFile->appendFile(
addFile,
buf.st_size);
938 neededSize +=
buf.st_size;
944 uint16_t neededChunks;
954 newInputFile->setChunks(neededChunks);
956 newInputFile->randomizeOrder(
rng_);
959 auto newInputFilePtr = newInputFile.get();
962 for (
size_t i = 0;
i < neededChunks;
i++) {
964 bool copy_active =
false;
974 unsigned int newTid = 0xffffffff;
984 bool copy_active =
false;
1002 if (newChunk ==
nullptr) {
1004 if (newTid != 0xffffffff)
1013 std::unique_lock<std::mutex> lk(
mReader_);
1016 if (
i == (
uint64_t)neededChunks - 1 && neededSize % chunkSize)
1017 toRead = neededSize % chunkSize;
1018 newChunk->
reset(
i * chunkSize, toRead,
i);
1030 unsigned int numFinishedThreads = 0;
1032 unsigned int tid = 0;
1036 std::unique_lock<std::mutex> lk(
mReader_);
1039 numFinishedThreads++;
1049 threadInit_.exchange(
true, std::memory_order_acquire);
1053 std::unique_lock<std::mutex> lk(
mReader_);
1079 bool fitToBuffer =
dataMode_->fitToBuffer();
1084 for (
auto s :
file->diskFileSizes_)
1089 <<
"maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1091 close(
file->rawFd_);
1095 edm::LogInfo(
"DAQSource") <<
"chunk size was increased to " << (chunk->
size_ >> 20) <<
" MB";
1101 unsigned int bufferLeftInitial = (chunk->
offset_ == 0 &&
file->rawFd_ != -1) ?
file->rawHeaderSize_ : 0;
1104 auto readPrimary = [&](
uint64_t bufferLeft) {
1108 int fileDescriptor = -1;
1109 bool fileOpenedHere =
false;
1112 fileDescriptor =
file->rawFd_;
1114 if (fileDescriptor == -1) {
1115 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1116 fileOpenedHere =
true;
1120 fileDescriptor =
file->rawFd_;
1122 if (fileDescriptor == -1) {
1123 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1124 fileOpenedHere =
true;
1127 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1128 fileOpenedHere =
true;
1132 if (fileDescriptor == -1) {
1133 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
file->fileName_
1134 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1139 if (fileOpenedHere) {
1140 off_t
pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1142 edm::LogError(
"DAQSource") <<
"readWorker failed to seek file -: " <<
file->fileName_
1143 <<
" fd:" << fileDescriptor <<
" to offset " << chunk->
offset_ 1144 <<
" error: " << strerror(errno);
1150 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
file->fileName_
1151 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1155 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1157 edm::LogInfo(
"DAQSource") <<
"readWorker read -: " << (int64_t)(chunk->
usedSize_ - bufferLeft) <<
" or " 1162 (
void*)(chunk->
buf_ + bufferLeft),
1166 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1167 <<
" fd:" << fileDescriptor <<
" last: " <<
last <<
" error: " << strerror(errno);
1179 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1180 <<
" fd:" << fileDescriptor <<
" last:" <<
last 1181 <<
" expectedChunkSize:" << chunk->
usedSize_ 1183 <<
" skipped:" <<
skipped <<
" block:" << (
i + 1) <<
"/" << readBlocks
1184 <<
" error: " << strerror(errno);
1193 file->fileSizes_[0] = bufferLeft;
1195 if (chunk->
offset_ + bufferLeft ==
file->diskFileSizes_[0] || bufferLeft == chunk->
size_) {
1198 close(fileDescriptor);
1199 fileDescriptor = -1;
1201 assert(fileDescriptor == -1);
1203 if (fitToBuffer && bufferLeft !=
file->diskFileSizes_[0]) {
1204 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[0]
1205 <<
" read:" << bufferLeft <<
" expected:" <<
file->diskFileSizes_[0];
1212 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1213 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1214 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1220 auto readSecondary = [&](
uint64_t bufferLeft,
unsigned int j) {
1224 int fileDescriptor = open(
addFile.c_str(), O_RDONLY);
1226 if (fileDescriptor < 0) {
1227 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
addFile <<
" fd:" << fileDescriptor
1228 <<
" error: " << strerror(errno);
1233 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
addFile <<
" at offset " 1234 << lseek(fileDescriptor, 0, SEEK_CUR);
1238 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1244 (
void*)(chunk->
buf_ + bufferLeft),
1248 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
addFile <<
" fd:" << fileDescriptor
1249 <<
" error: " << strerror(errno);
1251 close(fileDescriptor);
1261 close(fileDescriptor);
1262 file->fileSizes_[
j] = fileLen;
1265 if (fitToBuffer && fileLen !=
file->diskFileSizes_[
j]) {
1266 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[
j]
1267 <<
" read:" << fileLen <<
" expected:" <<
file->diskFileSizes_[
j];
1274 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1275 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1276 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1281 for (
unsigned int j :
file->fileOrder_) {
1283 readPrimary(bufferLeftInitial);
1285 readSecondary(
file->bufferOffsets_[
j],
j);
1308 throw cms::Exception(
"DAQSource:threadError") <<
" file reader thread error ";
1335 if (currentLeft <
size) {
1374 std::pair<bool, unsigned int>
ret(
true, itr->second);
1379 return std::pair<bool, unsigned int>(
false, 0);
1384 if (
a.rfind(
'/') != std::string::npos)
1385 a =
a.substr(
a.rfind(
'/'));
1386 if (
b.rfind(
'/') != std::string::npos)
1387 b =
b.substr(
b.rfind(
'/'));
1395 if (fileStem.find(
"file://") == 0)
1396 fileStem = fileStem.substr(7);
1397 else if (fileStem.find(
"file:") == 0)
1398 fileStem = fileStem.substr(5);
1399 auto end = fileStem.find(
'_');
1401 if (fileStem.find(
"run") == 0) {
1405 long rval = std::stol(runStr);
1406 edm::LogInfo(
"DAQSource") <<
"Autodetected run number in fileListMode -: " << rval;
1419 if (nextFile.find(
"file://") == 0)
1420 nextFile = nextFile.substr(7);
1421 else if (nextFile.find(
"file:") == 0)
1422 nextFile = nextFile.substr(5);
1425 if (fileStem.find(
"ls"))
1426 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1427 if (fileStem.find(
'_'))
1428 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1431 ls = std::stoul(fileStem);
1446 return getFile(
ls, nextFile, lockWaitTime);
std::string & buBaseRunDir()
static const char runNumber_[]
std::unique_ptr< RawInputFile > currentFile_
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
edm::RunNumber_t runNumber_
void startedLookingForFile()
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::unique_ptr< std::thread > readSupervisorThread_
std::default_random_engine rng_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::condition_variable startupCv_
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
unsigned int loopModeIterationInc_
Next checkNext() override
void read(edm::EventPrincipal &eventPrincipal) override
volatile std::atomic< bool > shutdown_flag
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
bool isSingleStreamThread()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::mutex fileDeleteLock_
const std::string dataModeConfig_
std::vector< std::thread * > workerThreads_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
unsigned int currentLumiSection_
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
std::string getEoRFilePathOnFU() const
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
uint64_t eventChunkBlock_
evf::EvFDaqDirector::FileStatus getNextDataBlock()
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
unsigned int fileListIndex_
unsigned int eventsThisLumi_
const bool fileListLoopMode_
std::condition_variable cvWakeup_
std::vector< std::condition_variable * > cvReader_
unsigned int maxBufferedFiles_
evf::FastMonitoringService * fms_
void readWorker(unsigned int tid)
std::atomic< bool > quit_threads_
const bool alwaysStartFromFirstLS_
std::shared_ptr< DataMode > dataMode_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
const std::vector< unsigned int > testTCDSFEDRange_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
std::vector< std::string > listFileNames_
std::vector< int > streamFileTracker_
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< unsigned int > tid_active_
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
void setMonStateSup(evf::FastMonState::InputState state)
friend class RawInputFile
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
evf::EvFDaqDirector * daqDirector_
tbb::concurrent_queue< InputChunk * > freeChunks_
unsigned int numConcurrentReads_
int addFile(MEStore µmes, int fd)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Log< level::Warning, false > LogWarning
std::vector< ReaderInfo > workerJob_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
void setMonState(evf::FastMonState::InputState state)
bool exceptionDetected() const
void setFMS(evf::FastMonitoringService *fms)
const bool verifyChecksum_
Power< A, B >::type pow(const A &a, const B &b)
std::vector< std::string > const & getBUBaseDirs() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void setInState(FastMonState::InputState inputState)