34 dataModeConfig_(
pset.getUntrackedParameter<
std::
string>(
"dataMode")),
35 eventChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkSize")) << 20),
36 maxChunkSize_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"maxChunkSize")) << 20),
37 eventChunkBlock_(
uint64_t(
pset.getUntrackedParameter<unsigned
int>(
"eventChunkBlock")) << 20),
38 numBuffers_(
pset.getUntrackedParameter<unsigned
int>(
"numBuffers")),
39 maxBufferedFiles_(
pset.getUntrackedParameter<unsigned
int>(
"maxBufferedFiles")),
40 alwaysStartFromFirstLS_(
pset.getUntrackedParameter<
bool>(
"alwaysStartFromFirstLS",
false)),
41 verifyChecksum_(
pset.getUntrackedParameter<
bool>(
"verifyChecksum")),
42 useL1EventID_(
pset.getUntrackedParameter<
bool>(
"useL1EventID")),
43 testTCDSFEDRange_(
pset.getUntrackedParameter<
std::
vector<unsigned
int>>(
"testTCDSFEDRange")),
45 fileListMode_(
pset.getUntrackedParameter<
bool>(
"fileListMode")),
46 fileListLoopMode_(
pset.getUntrackedParameter<
bool>(
"fileListLoopMode",
false)),
49 currentLumiSection_(0),
51 rng_(
std::chrono::system_clock::
now().time_since_epoch().
count()) {
53 gethostname(thishost, 255);
58 throw cms::Exception(
"DAQSource::DAQSource") <<
"maxChunkSize must be equal or larger than eventChunkSize";
63 throw cms::Exception(
"DAQSource::DAQSource") <<
"eventChunkBlock must be equal or smaller than eventChunkSize";
65 edm::LogInfo(
"DAQSource") <<
"Construction. read-ahead chunk size -: " << std::endl
73 throw cms::Exception(
"DAQSource::DAQSource") <<
"Invalid TCDS Test FED range parameter";
89 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
90 dataMode_->setTesting(
pset.getUntrackedParameter<
bool>(
"testing",
false));
92 long autoRunNumber = -1;
96 if (autoRunNumber < 0)
97 throw cms::Exception(
"DAQSource::DAQSource") <<
"Run number not found from filename";
106 auto& daqProvenanceHelpers =
dataMode_->makeDaqProvenanceHelpers();
107 for (
const auto& daqProvenanceHelper : daqProvenanceHelpers)
120 throw cms::Exception(
"DAQSource::DAQSource") <<
"no reading enabled with numBuffers parameter 0";
127 edm::LogError(
"DAQSource::DAQSource") <<
"Intel crc32c checksum computation unavailable";
134 edm::LogInfo(
"DAQSource") <<
"No FastMonitoringService found in the configuration";
139 throw cms::Exception(
"DAQSource") <<
"FastMonitoringService not found";
147 edm::LogInfo(
"DAQSource") <<
"EvFDaqDirector/Source configured to use file service";
167 cvReader_.push_back(std::make_unique<std::condition_variable>());
195 it->second->unsetDeleteFile();
210 std::unique_lock<std::mutex> lk(
mReader_);
222 desc.setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
223 desc.addUntracked<
std::string>(
"dataMode",
"FRD")->setComment(
"Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
224 desc.addUntracked<
unsigned int>(
"eventChunkSize", 64)->setComment(
"Input buffer (chunk) size");
225 desc.addUntracked<
unsigned int>(
"maxChunkSize", 0)
226 ->setComment(
"Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
227 desc.addUntracked<
unsigned int>(
"eventChunkBlock", 0)
229 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If " 230 "0 is specified, use chunk size.");
232 desc.addUntracked<
unsigned int>(
"numBuffers", 2)->setComment(
"Number of buffers used for reading input");
233 desc.addUntracked<
unsigned int>(
"maxBufferedFiles", 2)
234 ->setComment(
"Maximum number of simultaneously buffered raw files");
235 desc.addUntracked<
unsigned int>(
"alwaysStartFromfirstLS",
false)
236 ->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
237 desc.addUntracked<
bool>(
"verifyChecksum",
true)
238 ->setComment(
"Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
239 desc.addUntracked<
bool>(
"useL1EventID",
false)
240 ->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
241 desc.addUntracked<std::vector<unsigned int>>(
"testTCDSFEDRange", std::vector<unsigned int>())
242 ->setComment(
"[min, max] range to search for TCDS FED ID in test setup");
243 desc.addUntracked<
bool>(
"fileListMode",
false)
244 ->setComment(
"Use fileNames parameter to directly specify raw files to open");
245 desc.addUntracked<std::vector<std::string>>(
"fileNames", std::vector<std::string>())
246 ->setComment(
"file list used when fileListMode is enabled");
247 desc.setAllowAnything();
248 descriptions.
add(
"source",
desc);
267 auto nextEvent = [
this]() {
268 auto getNextEvent = [
this]() {
285 switch (nextEvent()) {
294 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
300 edm::LogInfo(
"DAQSource") <<
"----------------RUN ENDED----------------";
331 gettimeofday(&tv,
nullptr);
332 const edm::Timestamp lsopentime((
unsigned long long)tv.tv_sec * 1000000 + (
unsigned long long)tv.tv_usec);
340 edm::LogInfo(
"DAQSource") <<
"New lumi section was opened. LUMI -: " << lumiSection;
351 dataMode_->setDataBlockInitialized(
false);
378 std::unique_lock<std::mutex> lkw(
mWakeup_);
388 throw cms::Exception(
"DAQSource::getNextDataBlock") <<
"Run has been aborted by the input source reader thread";
428 <<
" but according to BU JSON there should be " <<
currentFile_->nEvents_ <<
" events";
448 throw cms::Exception(
"DAQSource::getNextDataBlock") <<
"Premature end of input file while reading file header";
465 <<
"Premature end of input file while reading event header. Missing: " 480 unsigned char* dataPosition;
496 <<
"Premature end of input file (missing:" << (msgSize -
currentFile_->fileSizeLeft())
497 <<
") while parsing block";
505 }
else if (currentChunkSize -
currentFile_->chunkPosition_ < msgSize) {
524 chunkEnd =
currentFile_->advance(dataPosition, msgSize);
558 bool fileIsBeingProcessed =
false;
561 fileIsBeingProcessed =
true;
594 uint32_t lockCount = 0;
597 bool requireHeader =
dataMode_->requireHeader();
606 bool copy_active =
false;
624 std::unique_lock<std::mutex> lkw(
mWakeup_);
630 <<
"No free chunks or threads. Worker pool empty:" <<
workerPool_.empty()
634 LogDebug(
"DAQSource") <<
"No free chunks or threads...";
650 int64_t fileSizeFromMetadata;
656 bool fitToBuffer =
dataMode_->fitToBuffer();
659 uint16_t rawHeaderSize = 0;
660 uint32_t lsFromRaw = 0;
661 int32_t serverEventsInNewFile = -1;
675 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
677 bool hasDiscardedLumi =
false;
680 edm::LogWarning(
"DAQSource") <<
"Source detected that the lumisection is discarded -: " <<
i;
681 hasDiscardedLumi =
true;
685 if (hasDiscardedLumi)
690 edm::LogWarning(
"DAQSource") <<
"Input throttled detected, reading files is paused...";
707 uint16_t rawDataType;
713 serverEventsInNewFile,
714 fileSizeFromMetadata,
730 serverEventsInNewFile,
731 fileSizeFromMetadata,
743 if (thisLockWaitTimeUs > 0.)
744 sumLockWaitTimeUs += thisLockWaitTimeUs;
752 sumLockWaitTimeUs = 0;
773 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
784 }
else if (
ls < 100) {
788 for (
unsigned int nextLS =
std::min(lsToStart,
ls); nextLS <=
ls; nextLS++) {
807 <<
". Aborting execution." << std::endl;
820 if (!(dbgcount % 20))
821 LogDebug(
"DAQSource") <<
"No file for me... sleep and try again...";
823 backoff_exp =
std::min(4, backoff_exp);
825 int sleeptime = (
int)(100000. *
pow(2, backoff_exp));
834 LogDebug(
"DAQSource") <<
"The director says to grab -: " << nextFile;
841 int stat_res =
stat(rawFile.c_str(), &st);
842 if (stat_res == -1) {
843 edm::LogError(
"DAQSource") <<
"Can not stat file (" << errno <<
"):-" << rawFile << std::endl;
859 eventsInNewFile = -1;
861 eventsInNewFile = serverEventsInNewFile;
862 assert(eventsInNewFile >= 0);
863 assert((eventsInNewFile > 0) ==
864 (fileSize > rawHeaderSize));
867 std::pair<bool, std::vector<std::string>> additionalFiles =
869 if (!additionalFiles.first) {
876 std::unique_ptr<RawInputFile> newInputFile(
new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
888 for (
const auto&
addFile : additionalFiles.second) {
891 unsigned int fcnt = 0;
902 if ((fcnt && fcnt % 3000 == 0) ||
quit_threads_.load(std::memory_order_relaxed)) {
908 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
909 bool prematureEoR =
false;
910 if (
stat(secondaryEoR.c_str(), &bufEoR) == 0) {
913 <<
"EoR file appeared in -: " << secondaryPath <<
" while waiting for index file " <<
addFile;
916 }
else if (
stat(mainEoR.c_str(), &bufEoR) == 0) {
921 <<
"Main EoR file appeared -: " << mainEoR <<
" while waiting for index file " <<
addFile;
944 newInputFile->appendFile(
addFile,
buf.st_size);
945 neededSize +=
buf.st_size;
951 uint16_t neededChunks;
961 newInputFile->setChunks(neededChunks);
963 newInputFile->randomizeOrder(
rng_);
966 auto newInputFilePtr = newInputFile.get();
969 for (
size_t i = 0;
i < neededChunks;
i++) {
971 bool copy_active =
false;
981 unsigned int newTid = 0xffffffff;
991 bool copy_active =
false;
1009 if (newChunk ==
nullptr) {
1011 if (newTid != 0xffffffff)
1020 std::unique_lock<std::mutex> lk(
mReader_);
1023 if (
i == (
uint64_t)neededChunks - 1 && neededSize % chunkSize)
1024 toRead = neededSize % chunkSize;
1025 newChunk->
reset(
i * chunkSize, toRead,
i);
1037 unsigned int numFinishedThreads = 0;
1039 unsigned int tid = 0;
1043 std::unique_lock<std::mutex> lk(
mReader_);
1046 numFinishedThreads++;
1056 threadInit_.exchange(
true, std::memory_order_acquire);
1060 std::unique_lock<std::mutex> lk(
mReader_);
1086 bool fitToBuffer =
dataMode_->fitToBuffer();
1091 for (
auto s :
file->diskFileSizes_)
1096 <<
"maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1098 close(
file->rawFd_);
1102 edm::LogInfo(
"DAQSource") <<
"chunk size was increased to " << (chunk->
size_ >> 20) <<
" MB";
1108 unsigned int bufferLeftInitial = (chunk->
offset_ == 0 &&
file->rawFd_ != -1) ?
file->rawHeaderSize_ : 0;
1111 auto readPrimary = [&](
uint64_t bufferLeft) {
1115 int fileDescriptor = -1;
1116 bool fileOpenedHere =
false;
1119 fileDescriptor =
file->rawFd_;
1121 if (fileDescriptor == -1) {
1122 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1123 fileOpenedHere =
true;
1127 fileDescriptor =
file->rawFd_;
1129 if (fileDescriptor == -1) {
1130 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1131 fileOpenedHere =
true;
1134 fileDescriptor = open(
file->fileName_.c_str(), O_RDONLY);
1135 fileOpenedHere =
true;
1139 if (fileDescriptor == -1) {
1140 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
file->fileName_
1141 <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1146 if (fileOpenedHere) {
1147 off_t
pos = lseek(fileDescriptor, chunk->
offset_, SEEK_SET);
1149 edm::LogError(
"DAQSource") <<
"readWorker failed to seek file -: " <<
file->fileName_
1150 <<
" fd:" << fileDescriptor <<
" to offset " << chunk->
offset_ 1151 <<
" error: " << strerror(errno);
1157 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
file->fileName_
1158 <<
" at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1162 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1164 edm::LogInfo(
"DAQSource") <<
"readWorker read -: " << (int64_t)(chunk->
usedSize_ - bufferLeft) <<
" or " 1169 (
void*)(chunk->
buf_ + bufferLeft),
1173 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1174 <<
" fd:" << fileDescriptor <<
" last: " <<
last <<
" error: " << strerror(errno);
1186 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
file->fileName_
1187 <<
" fd:" << fileDescriptor <<
" last:" <<
last 1188 <<
" expectedChunkSize:" << chunk->
usedSize_ 1190 <<
" skipped:" <<
skipped <<
" block:" << (
i + 1) <<
"/" << readBlocks
1191 <<
" error: " << strerror(errno);
1200 file->fileSizes_[0] = bufferLeft;
1202 if (chunk->
offset_ + bufferLeft ==
file->diskFileSizes_[0] || bufferLeft == chunk->
size_) {
1205 close(fileDescriptor);
1206 fileDescriptor = -1;
1208 assert(fileDescriptor == -1);
1210 if (fitToBuffer && bufferLeft !=
file->diskFileSizes_[0]) {
1211 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[0]
1212 <<
" read:" << bufferLeft <<
" expected:" <<
file->diskFileSizes_[0];
1219 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1220 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1221 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1227 auto readSecondary = [&](
uint64_t bufferLeft,
unsigned int j) {
1231 int fileDescriptor = open(
addFile.c_str(), O_RDONLY);
1233 if (fileDescriptor < 0) {
1234 edm::LogError(
"DAQSource") <<
"readWorker failed to open file -: " <<
addFile <<
" fd:" << fileDescriptor
1235 <<
" error: " << strerror(errno);
1240 LogDebug(
"DAQSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " <<
addFile <<
" at offset " 1241 << lseek(fileDescriptor, 0, SEEK_CUR);
1245 for (
unsigned int i = 0;
i < readBlocks;
i++) {
1251 (
void*)(chunk->
buf_ + bufferLeft),
1255 edm::LogError(
"DAQSource") <<
"readWorker failed to read file -: " <<
addFile <<
" fd:" << fileDescriptor
1256 <<
" error: " << strerror(errno);
1258 close(fileDescriptor);
1268 close(fileDescriptor);
1269 file->fileSizes_[
j] = fileLen;
1272 if (fitToBuffer && fileLen !=
file->diskFileSizes_[
j]) {
1273 edm::LogError(
"DAQSource") <<
"mismatch between read file size for file -: " <<
file->fileNames_[
j]
1274 <<
" read:" << fileLen <<
" expected:" <<
file->diskFileSizes_[
j];
1281 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1282 LogDebug(
"DAQSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" 1283 <<
" in " << msec.count() <<
" ms (" << (bufferLeft >> 20) /
double(msec.count())
1288 for (
unsigned int j :
file->fileOrder_) {
1290 readPrimary(bufferLeftInitial);
1292 readSecondary(
file->bufferOffsets_[
j],
j);
1315 throw cms::Exception(
"DAQSource:threadError") <<
" file reader thread error ";
1342 if (currentLeft <
size) {
1382 std::pair<bool, unsigned int>
ret(
true, itr->second);
1387 return std::pair<bool, unsigned int>(
false, 0);
1392 if (
a.rfind(
'/') != std::string::npos)
1393 a =
a.substr(
a.rfind(
'/'));
1394 if (
b.rfind(
'/') != std::string::npos)
1395 b =
b.substr(
b.rfind(
'/'));
1403 if (fileStem.find(
"file://") == 0)
1404 fileStem = fileStem.substr(7);
1405 else if (fileStem.find(
"file:") == 0)
1406 fileStem = fileStem.substr(5);
1407 auto end = fileStem.find(
'_');
1409 if (fileStem.find(
"run") == 0) {
1413 long rval = std::stol(runStr);
1414 edm::LogInfo(
"DAQSource") <<
"Autodetected run number in fileListMode -: " << rval;
1427 if (nextFile.find(
"file://") == 0)
1428 nextFile = nextFile.substr(7);
1429 else if (nextFile.find(
"file:") == 0)
1430 nextFile = nextFile.substr(5);
1433 if (fileStem.find(
"ls"))
1434 fileStem = fileStem.substr(fileStem.find(
"ls") + 2);
1435 if (fileStem.find(
'_'))
1436 fileStem = fileStem.substr(0, fileStem.find(
'_'));
1439 ls = std::stoul(fileStem);
1454 return getFile(
ls, nextFile, lockWaitTime);
std::string & buBaseRunDir()
static const char runNumber_[]
std::unique_ptr< RawInputFile > currentFile_
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
edm::RunNumber_t runNumber_
void startedLookingForFile()
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::unique_ptr< std::thread > readSupervisorThread_
std::default_random_engine rng_
std::map< unsigned int, unsigned int > sourceEventsReport_
std::condition_variable startupCv_
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
unsigned int loopModeIterationInc_
Next checkNext() override
void read(edm::EventPrincipal &eventPrincipal) override
volatile std::atomic< bool > shutdown_flag
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
bool isSingleStreamThread()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::mutex fileDeleteLock_
const std::string dataModeConfig_
std::vector< std::thread * > workerThreads_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
unsigned int currentLumiSection_
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
std::string getEoRFilePathOnFU() const
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
uint64_t eventChunkBlock_
evf::EvFDaqDirector::FileStatus getNextDataBlock()
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
unsigned int fileListIndex_
unsigned int eventsThisLumi_
const bool fileListLoopMode_
std::condition_variable cvWakeup_
unsigned int maxBufferedFiles_
evf::FastMonitoringService * fms_
void readWorker(unsigned int tid)
std::atomic< bool > quit_threads_
const bool alwaysStartFromFirstLS_
std::shared_ptr< DataMode > dataMode_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
const std::vector< unsigned int > testTCDSFEDRange_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
std::vector< std::string > listFileNames_
std::vector< int > streamFileTracker_
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< unsigned int > tid_active_
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
void setMonStateSup(evf::FastMonState::InputState state)
friend class RawInputFile
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
evf::EvFDaqDirector * daqDirector_
tbb::concurrent_queue< InputChunk * > freeChunks_
unsigned int numConcurrentReads_
int addFile(MEStore µmes, int fd)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Log< level::Warning, false > LogWarning
std::vector< ReaderInfo > workerJob_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
void setMonState(evf::FastMonState::InputState state)
bool exceptionDetected() const
void setFMS(evf::FastMonitoringService *fms)
const bool verifyChecksum_
std::vector< std::string > const & getBUBaseDirs() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void setInState(FastMonState::InputState inputState)