34 dataModeConfig_(
pset.getUntrackedParameter<
std::
string>(
"dataMode")),
35 eventChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkSize")) << 20),
36 maxChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"maxChunkSize")) << 20),
37 eventChunkBlock_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkBlock")) << 20),
38 numBuffers_(
pset.getUntrackedParameter<unsigned
int>(
"numBuffers")),
39 maxBufferedFiles_(
pset.getUntrackedParameter<unsigned
int>(
"maxBufferedFiles")),
40 alwaysStartFromFirstLS_(
pset.getUntrackedParameter<
bool>(
"alwaysStartFromFirstLS",
false)),
41 verifyChecksum_(
pset.getUntrackedParameter<
bool>(
"verifyChecksum")),
42 useL1EventID_(
pset.getUntrackedParameter<
bool>(
"useL1EventID")),
43 testTCDSFEDRange_(
pset.getUntrackedParameter<
std::
vector<unsigned
int>>(
"testTCDSFEDRange")),
45 fileListMode_(
pset.getUntrackedParameter<
bool>(
"fileListMode")),
46 fileListLoopMode_(
pset.getUntrackedParameter<
bool>(
"fileListLoopMode",
false)),
49 currentLumiSection_(0),
51 rng_(
std::chrono::system_clock::
now().time_since_epoch().
count()) {
53 gethostname(thishost, 255);
58 throw cms::Exception(
"DAQSource::DAQSource") <<
"maxChunkSize must be equal or larger than eventChunkSize";
63 throw cms::Exception(
"DAQSource::DAQSource") <<
"eventChunkBlock must be equal or smaller than eventChunkSize";
65 edm::LogInfo(
"DAQSource") <<
"Construction. read-ahead chunk size -: " << std::endl
73 throw cms::Exception(
"DAQSource::DAQSource") <<
"Invalid TCDS Test FED range parameter";
91 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
92 dataMode_->setTesting(
pset.getUntrackedParameter<
bool>(
"testing",
false));
94 long autoRunNumber = -1;
98 if (autoRunNumber < 0)
99 throw cms::Exception(
"DAQSource::DAQSource") <<
"Run number not found from filename";
109 auto& daqProvenanceHelpers =
dataMode_->makeDaqProvenanceHelpers();
110 for (
const auto& daqProvenanceHelper : daqProvenanceHelpers)
123 throw cms::Exception(
"DAQSource::DAQSource") <<
"no reading enabled with numBuffers parameter 0";
130 edm::LogError(
"DAQSource::DAQSource") <<
"Intel crc32c checksum computation unavailable";
137 edm::LogInfo(
"DAQSource") <<
"No FastMonitoringService found in the configuration";
142 throw cms::Exception(
"DAQSource") <<
"FastMonitoringService not found";
150 edm::LogInfo(
"DAQSource") <<
"EvFDaqDirector/Source configured to use file service";
170 cvReader_.push_back(std::make_unique<std::condition_variable>());
198 it->second->unsetDeleteFile();
213 std::unique_lock<std::mutex> lk(
mReader_);
225 desc.setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
226 desc.addUntracked<
std::string>(
"dataMode",
"FRD")->setComment(
"Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
227 desc.addUntracked<
unsigned int>(
"eventChunkSize", 64)->setComment(
"Input buffer (chunk) size");
228 desc.addUntracked<
unsigned int>(
"maxChunkSize", 0)
229 ->setComment(
"Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
230 desc.addUntracked<
unsigned int>(
"eventChunkBlock", 0)
232 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If " 233 "0 is specified, use chunk size.");
235 desc.addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
236 desc.addUntracked<
unsigned int>(
"maxBufferedFiles", 2)
237 ->setComment(
"Maximum number of simultaneously buffered raw files");
238 desc.addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
239 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
240 desc.addUntracked<
bool>(
"verifyChecksum",
true)
241 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
242 desc.addUntracked<
bool>(
"useL1EventID",
false)
243 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
244 desc.addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
245 ->setComment(
"[min, max] range to search for TCDS FED ID in test setup");
246 desc.addUntracked<
bool>(
"fileListMode",
false)
247 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
248 desc.addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
249 ->setComment(
"file list used when fileListMode is enabled");
250 desc.setAllowAnything();
251 descriptions.
add(
"source",
desc);
270 auto nextEvent = [
this]() {
271 auto getNextEvent = [
this]() {
288 switch (nextEvent()) {
297 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
303 edm::LogInfo(
"DAQSource") <<
"----------------RUN ENDED----------------";
334 gettimeofday(&tv,
nullptr);
335 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
343 edm::LogInfo(
"DAQSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
354 dataMode_->setDataBlockInitialized(
false);
383 std::unique_lock<std::mutex> lkw(
mWakeup_);
394 throw cms::Exception(
"DAQSource::getNextDataBlock") <<
"Run has been aborted by the input source reader thread";
434 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
454 throw cms::Exception(
"DAQSource::getNextDataBlock") <<
"Premature end of input file while reading file header";
471 <<
"Premature end of input file while reading event header. Missing: " 489 unsigned char* dataPosition;
505 <<
"Premature end of input file (missing:" << (msgSize -
currentFile_->fileSizeLeft())
506 <<
") while parsing block";
514 }
else if (currentChunkSize -
currentFile_->chunkPosition_ < msgSize) {
534 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
568 bool fileIsBeingProcessed =
false;
571 fileIsBeingProcessed =
true;
604 uint32_t lockCount = 0;
607 bool requireHeader =
dataMode_->requireHeader();
616 bool copy_active =
false;
634 std::unique_lock<std::mutex> lkw(
mWakeup_);
640 <<
"No free chunks or threads. Worker pool empty:" <<
workerPool_.empty()
644 LogDebug(
"DAQSource") <<
"No free chunks or threads...";
660 int64_t fileSizeFromMetadata;
666 bool fitToBuffer =
dataMode_->fitToBuffer();
669 uint16_t rawHeaderSize = 0;
670 uint32_t lsFromRaw = 0;
671 int32_t serverEventsInNewFile = -1;
687 bool hasDiscardedLumi =
false;
690 edm::LogWarning(
"DAQSource") <<
"Source detected that the lumisection is discarded -: " <<
i;
691 hasDiscardedLumi =
true;
695 if (hasDiscardedLumi)
700 edm::LogWarning(
"DAQSource") <<
"Input throttled detected, reading files is paused...";
717 uint16_t rawDataType;
723 serverEventsInNewFile,
724 fileSizeFromMetadata,
740 serverEventsInNewFile,
741 fileSizeFromMetadata,
753 if (thisLockWaitTimeUs > 0.)
754 sumLockWaitTimeUs += thisLockWaitTimeUs;
762 sumLockWaitTimeUs = 0;
783 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
794 }
else if (
ls < 100) {
798 for (
unsigned int nextLS =
std::min(lsToStart,
ls); nextLS <=
ls; nextLS++) {
817 <<
". Aborting execution." << std::endl;
830 if (!(dbgcount % 20))
831 LogDebug(
"DAQSource") <<
"No file for me... sleep and try again...";
833 backoff_exp =
std::min(4, backoff_exp);
835 int sleeptime = (
int)(100000. *
pow(2, backoff_exp));
844 LogDebug(
"DAQSource") <<
"The director says to grab -: " << nextFile;
851 int stat_res =
stat(rawFile.c_str(), &st);
852 if (stat_res == -1) {
853 edm::LogError(
"DAQSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
869 eventsInNewFile = -1;
871 eventsInNewFile = serverEventsInNewFile;
872 assert(eventsInNewFile >= 0);
873 assert((eventsInNewFile > 0) ==
874 (fileSize > rawHeaderSize));
877 std::pair<bool, std::vector<std::string>> additionalFiles =
879 if (!additionalFiles.first) {
886 std::unique_ptr<RawInputFile> newInputFile(
new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
898 for (
const auto&
addFile : additionalFiles.second) {
901 unsigned int fcnt = 0;
912 if ((fcnt && fcnt % 3000 == 0) ||
quit_threads_.load(std::memory_order_relaxed)) {
918 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
919 bool prematureEoR =
false;
920 if (
stat(secondaryEoR.c_str(), &bufEoR) == 0) {
923 <<
"EoR file appeared in -: " << secondaryPath <<
" while waiting for index file " <<
addFile;
926 }
else if (
stat(mainEoR.c_str(), &bufEoR) == 0) {
931 <<
"Main EoR file appeared -: " << mainEoR <<
" while waiting for index file " <<
addFile;
954 newInputFile->appendFile(
addFile,
buf.st_size);
955 neededSize +=
buf.st_size;
961 uint16_t neededChunks;
971 newInputFile->setChunks(neededChunks);
973 newInputFile->randomizeOrder(
rng_);
976 auto newInputFilePtr = newInputFile.get();
979 for (
size_t i = 0;
i < neededChunks;
i++) {
981 bool copy_active =
false;
991 unsigned int newTid = 0xffffffff;
1001 bool copy_active =
false;
1019 if (newChunk ==
nullptr) {
1021 if (newTid != 0xffffffff)
1030 std::unique_lock<std::mutex> lk(
mReader_);
1033 if (
i == (
uint64_t)neededChunks - 1 && neededSize % chunkSize)
1034 toRead = neededSize % chunkSize;
1035 newChunk->
reset(
i * chunkSize, toRead,
i);
1047 unsigned int numFinishedThreads = 0;
1049 unsigned int tid = 0;
1053 std::unique_lock<std::mutex> lk(
mReader_);
1056 numFinishedThreads++;
1066 threadInit_.exchange(
true, std::memory_order_acquire);
1070 std::unique_lock<std::mutex> lk(
mReader_);
1098 bool fitToBuffer =
dataMode_->fitToBuffer();
1103 for (
auto s :
file->diskFileSizes_)
1108 <<
"maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1110 close(
file->rawFd_);
1114 edm::LogInfo(
"DAQSource") <<
"chunk size was increased to " << (chunk->
size_ >> 20) <<
" MB";
1120 unsigned int bufferLeftInitial = (chunk->
offset_ == 0 &&
file->rawFd_ != -1) ?
file->rawHeaderSize_ : 0;
1123 auto readPrimary = [&](
uint64_t bufferLeft) {
1127 int fileDescriptor = -1;
1128 bool fileOpenedHere =
false;
1131 fileDescriptor =
file->rawFd_;
1133 if (fileDescriptor == -1) {
1134 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1135 fileOpenedHere =
true;
1139 fileDescriptor =
file->rawFd_;
1141 if (fileDescriptor == -1) {
1142 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1143 fileOpenedHere =
true;
1146 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1147 fileOpenedHere =
true;
1151 if (fileDescriptor == -1) {
1152 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
file->fileName_
1153 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1158 if (fileOpenedHere) {
1159 off_t
pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1161 edm::LogError(
"DAQSource") <<
"readWorker failed to seek file -: " <<
file->fileName_
1162 <<
" fd:" << fileDescriptor <<
" to offset " << chunk->
offset_ 1163 <<
" error: " << strerror(errno);
1169 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
file->fileName_
1170 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1174 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1176 edm::LogInfo(
"DAQSource") <<
"readWorker read -: " << (int64_t)(chunk->
usedSize_ - bufferLeft) <<
" or " 1181 (
void*)(chunk->
buf_ + bufferLeft),
1185 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1186 <<
" fd:" << fileDescriptor <<
" last: " <<
last <<
" error: " << strerror(errno);
1198 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1199 <<
" fd:" << fileDescriptor <<
" last:" <<
last 1200 <<
" expectedChunkSize:" << chunk->
usedSize_ 1202 <<
" skipped:" <<
skipped <<
" block:" << (
i + 1) <<
"/" << readBlocks
1203 <<
" error: " << strerror(errno);
1212 file->fileSizes_[0] = bufferLeft;
1214 if (chunk->
offset_ + bufferLeft ==
file->diskFileSizes_[0] || bufferLeft == chunk->
size_) {
1217 close(fileDescriptor);
1218 fileDescriptor = -1;
1220 assert(fileDescriptor == -1);
1222 if (fitToBuffer && bufferLeft !=
file->diskFileSizes_[0]) {
1223 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[0]
1224 <<
" read:" << bufferLeft <<
" expected:" <<
file->diskFileSizes_[0];
1231 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1232 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1233 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1239 auto readSecondary = [&](
uint64_t bufferLeft,
unsigned int j) {
1243 int fileDescriptor = open(
addFile.c_str(), O_RDONLY);
1245 if (fileDescriptor < 0) {
1246 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
addFile <<
" fd:" << fileDescriptor
1247 <<
" error: " << strerror(errno);
1252 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
addFile <<
" at offset " 1253 << lseek(fileDescriptor, 0, SEEK_CUR);
1257 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1263 (
void*)(chunk->
buf_ + bufferLeft),
1267 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
addFile <<
" fd:" << fileDescriptor
1268 <<
" error: " << strerror(errno);
1270 close(fileDescriptor);
1280 close(fileDescriptor);
1281 file->fileSizes_[
j] = fileLen;
1284 if (fitToBuffer && fileLen !=
file->diskFileSizes_[
j]) {
1285 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[
j]
1286 <<
" read:" << fileLen <<
" expected:" <<
file->diskFileSizes_[
j];
1293 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1294 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1295 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1300 for (
unsigned int j :
file->fileOrder_) {
1302 readPrimary(bufferLeftInitial);
1304 readSecondary(
file->bufferOffsets_[
j],
j);
1327 throw cms::Exception(
"DAQSource:threadError") <<
" file reader thread error ";
1354 if (currentLeft <
size) {
1394 std::pair<bool, unsigned int>
ret(
true, itr->second);
1399 return std::pair<bool, unsigned int>(
false, 0);
1404 if (
a.rfind(
'/') != std::string::npos)
1405 a =
a.substr(
a.rfind(
'/'));
1406 if (
b.rfind(
'/') != std::string::npos)
1407 b =
b.substr(
b.rfind(
'/'));
1415 if (fileStem.find(
"file://") == 0)
1416 fileStem = fileStem.substr(7);
1417 else if (fileStem.find(
"file:") == 0)
1418 fileStem = fileStem.substr(5);
1419 auto end = fileStem.find(
'_');
1421 if (fileStem.find(
"run") == 0) {
1425 long rval = std::stol(runStr);
1426 edm::LogInfo(
"DAQSource") <<
"Autodetected run number in fileListMode -: " << rval;
1439 if (nextFile.find(
"file://") == 0)
1440 nextFile = nextFile.substr(7);
1441 else if (nextFile.find(
"file:") == 0)
1442 nextFile = nextFile.substr(5);
1445 if (fileStem.find(
"ls"))
1446 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1447 if (fileStem.find(
'_'))
1448 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1451 ls = std::stoul(fileStem);
1466 return getFile(
ls, nextFile, lockWaitTime);
std::string & buBaseRunDir()
static const char runNumber_[]
std::unique_ptr< RawInputFile > currentFile_
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
edm::RunNumber_t runNumber_
void startedLookingForFile()
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::unique_ptr< std::thread > readSupervisorThread_
std::default_random_engine rng_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::condition_variable startupCv_
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
unsigned int loopModeIterationInc_
Next checkNext() override
void read(edm::EventPrincipal &eventPrincipal) override
volatile std::atomic< bool > shutdown_flag
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
bool isSingleStreamThread()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::mutex fileDeleteLock_
const std::string dataModeConfig_
std::vector< std::thread * > workerThreads_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
unsigned int currentLumiSection_
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
std::string getEoRFilePathOnFU() const
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
uint64_t eventChunkBlock_
evf::EvFDaqDirector::FileStatus getNextDataBlock()
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
unsigned int fileListIndex_
unsigned int eventsThisLumi_
const bool fileListLoopMode_
std::condition_variable cvWakeup_
unsigned int maxBufferedFiles_
evf::FastMonitoringService * fms_
void readWorker(unsigned int tid)
std::atomic< bool > quit_threads_
const bool alwaysStartFromFirstLS_
std::shared_ptr< DataMode > dataMode_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
const std::vector< unsigned int > testTCDSFEDRange_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
std::vector< std::string > listFileNames_
std::vector< int > streamFileTracker_
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< unsigned int > tid_active_
std::vector< int > const & getBUBaseDirsNSources() const
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
void setMonStateSup(evf::FastMonState::InputState state)
friend class RawInputFile
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
evf::EvFDaqDirector * daqDirector_
tbb::concurrent_queue< InputChunk * > freeChunks_
unsigned int numConcurrentReads_
int addFile(MEStore µmes, int fd)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Log< level::Warning, false > LogWarning
std::vector< ReaderInfo > workerJob_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
void setMonState(evf::FastMonState::InputState state)
bool exceptionDetected() const
void setFMS(evf::FastMonitoringService *fms)
const bool verifyChecksum_
Power< A, B >::type pow(const A &a, const B &b)
std::vector< std::string > const & getBUBaseDirs() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void setInState(FastMonState::InputState inputState)