16 #include <boost/algorithm/string.hpp> 17 #include <boost/filesystem/fstream.hpp> 52 #include <boost/lexical_cast.hpp> 58 edm::RawInputSource(pset, desc),
59 defPath_(pset.getUntrackedParameter<
std::
string> (
"buDefPath",
std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
60 eventChunkSize_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkSize",32)*1048576),
61 eventChunkBlock_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkBlock",32)*1048576),
62 numBuffers_(pset.getUntrackedParameter<unsigned
int> (
"numBuffers",2)),
63 maxBufferedFiles_(pset.getUntrackedParameter<unsigned
int> (
"maxBufferedFiles",2)),
64 getLSFromFilename_(pset.getUntrackedParameter<
bool> (
"getLSFromFilename",
true)),
65 verifyAdler32_(pset.getUntrackedParameter<
bool> (
"verifyAdler32",
true)),
66 verifyChecksum_(pset.getUntrackedParameter<
bool> (
"verifyChecksum",
true)),
67 useL1EventID_(pset.getUntrackedParameter<
bool> (
"useL1EventID",
false)),
69 fileListMode_(pset.getUntrackedParameter<
bool> (
"fileListMode",
false)),
70 fileListLoopMode_(pset.getUntrackedParameter<
bool> (
"fileListLoopMode",
false)),
76 currentLumiSection_(0),
82 gethostname(thishost, 255);
83 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " 85 <<
" MB on host " << thishost;
87 long autoRunNumber = -1;
92 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
105 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
117 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
126 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
127 "no reading enabled with numBuffers parameter 0";
134 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
141 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
147 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
153 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
176 cvReader_.push_back(
new std::condition_variable);
178 threadInit_.store(
false,std::memory_order_release);
201 std::unique_lock<std::mutex> lk(
mReader_);
222 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
223 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
224 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
225 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
226 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
227 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
228 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
229 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
230 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
231 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
233 descriptions.
add(
"source", desc);
262 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
265 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
280 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
318 if (checkIfExists==
false ||
stat(fuBoLS.c_str(), &buf) != 0) {
319 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
333 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
336 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
349 gettimeofday(&tv,
nullptr);
350 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
355 lumiSection, lsopentime,
361 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
386 std::unique_lock<std::mutex> lkw(
mWakeup_);
400 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
456 <<
" but according to BU JSON there should be " 461 std::unique_lock<std::mutex> lkw(
mWakeup_);
484 "Premature end of input file while reading event header";
504 if (detectedFRDversion_==0) {
505 detectedFRDversion_=*((
uint32*)dataPosition);
506 if (detectedFRDversion_>5)
509 assert(detectedFRDversion_>=1);
517 "Premature end of input file while reading event header";
524 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
525 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
526 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
534 "Premature end of input file while reading event data";
560 unsigned char *dataPosition;
568 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
569 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
570 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
578 "Premature end of input file while reading event data";
612 if ( crc !=
event_->crc32c() ) {
615 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
616 " but calculated 0x" << crc;
621 uint32_t adler = adler32(0
L,Z_NULL,0);
622 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
624 if ( adler !=
event_->adler32() ) {
627 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
628 " but calculated 0x" << adler;
649 catch (
const boost::filesystem::filesystem_error& ex)
651 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652 <<
". Trying again.";
657 catch (
const boost::filesystem::filesystem_error&) {}
661 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
662 <<
". Trying again.";
697 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.
triggerType()),
725 bool fileIsBeingProcessed =
false;
728 fileIsBeingProcessed =
true;
732 if (!fileIsBeingProcessed) {
751 gettimeofday(&stv,
nullptr);
753 time = (time << 32) + stv.tv_usec;
756 uint32_t eventSize =
event_->eventSize();
757 unsigned char*
event = (
unsigned char*)
event_->payload();
760 while (eventSize > 0) {
771 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
784 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
794 memcpy(fedData.
data(),
event + eventSize, fedSize);
796 assert(eventSize == 0);
809 std::ostringstream fileNameWithPID;
810 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" 811 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
812 jsonDestPath /= fileNameWithPID.str();
814 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to " 819 catch (
const boost::filesystem::filesystem_error& ex)
822 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
833 catch (
const boost::filesystem::filesystem_error& ex)
836 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
841 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
844 boost::filesystem::ifstream ij(jsonDestPath);
848 std::stringstream ss;
850 if (!reader.
parse(ss.str(), deserializeRoot)) {
851 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
853 <<
"CONTENT:\n" << ss.str()<<
".";
854 throw std::runtime_error(
"Cannot deserialize input JSON file");
873 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
874 " error reading number of events from BU JSON -: No input value " <<
data;
876 return boost::lexical_cast<
int>(
data);
879 catch (
const boost::filesystem::filesystem_error& ex)
883 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
885 catch (std::runtime_error
e)
889 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
892 catch( boost::bad_lexical_cast
const& ) {
893 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 894 <<
"Input value is -: " <<
data;
901 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
914 unsigned int currentLumiSection = 0;
924 uint32_t lockCount=0;
935 bool copy_active=
false;
936 for (
auto j :
tid_active_)
if (j) copy_active=
true;
951 std::unique_lock<std::mutex> lkw(
mWakeup_);
956 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
987 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
997 if (thisLockWaitTimeUs>0.)
998 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1005 sumLockWaitTimeUs=0;
1032 currentLumiSection =
ls;
1037 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1047 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1053 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1057 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1060 int stat_res =
stat(rawFile.c_str(),&st);
1062 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1066 fileSize=st.st_size;
1073 int eventsInNewFile;
1075 if (fileSize==0) eventsInNewFile=0;
1076 else eventsInNewFile=-1;
1080 assert( eventsInNewFile>=0 );
1081 assert((eventsInNewFile>0) == (fileSize>0));
1089 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1093 for (
unsigned int i=0;
i<neededChunks;
i++) {
1096 bool copy_active=
false;
1097 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1104 unsigned int newTid = 0xffffffff;
1110 bool copy_active=
false;
1111 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1123 if (newChunk ==
nullptr) {
1131 std::unique_lock<std::mutex> lk(
mReader_);
1134 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1135 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1145 if (!eventsInNewFile) {
1147 std::unique_lock<std::mutex> lkw(
mWakeup_);
1148 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1157 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1159 std::unique_lock<std::mutex> lkw(
mWakeup_);
1163 newChunk->
reset(0,toRead,0);
1167 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1168 newInputFile->
chunks_[0]=newChunk;
1177 unsigned numFinishedThreads = 0;
1180 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1181 std::unique_lock<std::mutex> lk(
mReader_);
1184 numFinishedThreads++;
1195 threadInit_.exchange(
true,std::memory_order_acquire);
1200 std::unique_lock<std::mutex> lk(
mReader_);
1225 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1226 off_t
pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1229 if (fileDescriptor>=0)
1230 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1234 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1235 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1241 unsigned int bufferLeft = 0;
1255 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1256 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1257 close(fileDescriptor);
1270 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1278 while (!waitForChunk(currentChunk_)) {
1280 if (parent_->exceptionState()) parent_->threadError();
1283 dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1284 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1286 if (currentLeft < size) {
1289 while (!waitForChunk(currentChunk_+1)) {
1291 if (parent_->exceptionState()) parent_->threadError();
1294 dataPosition-=chunkPosition_;
1295 assert(dataPosition==chunks_[currentChunk_]->buf_);
1296 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1297 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1299 bufferPosition_+=
size;
1300 chunkPosition_=size-currentLeft;
1305 chunkPosition_+=
size;
1306 bufferPosition_+=
size;
1314 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1315 assert(size - offset < chunks_[currentChunk_]->size_);
1316 memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1317 chunkPosition_+=
size;
1318 bufferPosition_+=
size;
1322 chunkPosition_-=
size;
1323 bufferPosition_-=
size;
1335 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1338 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1344 uint32_t existingSize = 0;
1359 for (uint32_t
i=0;
i<blockcount;
i++) {
1373 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1387 itr->second+=events;
1397 auto && ret = std::pair<bool,unsigned int>(
true,itr->second);
1403 return std::pair<bool,unsigned int>(
false,0);
1410 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1411 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1418 auto end = fileStem.find(
"_");
1419 if (fileStem.find(
"run")==0) {
1423 long rval = boost::lexical_cast<
long>(runStr);
1424 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1427 catch( boost::bad_lexical_cast
const& ) {
1428 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1439 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1440 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1443 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1444 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1447 ls = boost::lexical_cast<
unsigned int>(fileStem);
1463 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
static const uint32_t length
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
StreamID streamID() const
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
unsigned long long TimeValue_t
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
def getRunNumber(filename)
unsigned long long uint64_t
void deserialize(Json::Value &root) override
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const