16 #include <boost/algorithm/string.hpp> 17 #include <boost/filesystem/fstream.hpp> 52 #include <boost/lexical_cast.hpp> 58 edm::RawInputSource(pset, desc),
59 defPath_(pset.getUntrackedParameter<
std::
string> (
"buDefPath",
std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
60 eventChunkSize_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkSize",32)*1048576),
61 eventChunkBlock_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkBlock",32)*1048576),
62 numBuffers_(pset.getUntrackedParameter<unsigned
int> (
"numBuffers",2)),
63 maxBufferedFiles_(pset.getUntrackedParameter<unsigned
int> (
"maxBufferedFiles",2)),
64 getLSFromFilename_(pset.getUntrackedParameter<
bool> (
"getLSFromFilename",
true)),
65 verifyAdler32_(pset.getUntrackedParameter<
bool> (
"verifyAdler32",
true)),
66 verifyChecksum_(pset.getUntrackedParameter<
bool> (
"verifyChecksum",
true)),
67 useL1EventID_(pset.getUntrackedParameter<
bool> (
"useL1EventID",
false)),
69 fileListMode_(pset.getUntrackedParameter<
bool> (
"fileListMode",
false)),
70 fileListLoopMode_(pset.getUntrackedParameter<
bool> (
"fileListLoopMode",
false)),
76 currentLumiSection_(0),
82 gethostname(thishost, 255);
83 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " 85 <<
" MB on host " << thishost;
87 long autoRunNumber = -1;
92 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
105 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
117 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
126 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
127 "no reading enabled with numBuffers parameter 0";
134 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
141 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
147 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
153 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
176 cvReader_.push_back(
new std::condition_variable);
178 threadInit_.store(
false,std::memory_order_release);
201 std::unique_lock<std::mutex> lk(
mReader_);
222 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
223 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
224 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
225 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
226 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
227 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
228 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
229 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
230 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
231 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
233 descriptions.
add(
"source", desc);
262 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
265 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
280 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
318 if (checkIfExists==
false ||
stat(fuBoLS.c_str(), &buf) != 0) {
319 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
333 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
336 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
349 gettimeofday(&tv,
nullptr);
350 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
355 lumiSection, lsopentime,
361 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
386 std::unique_lock<std::mutex> lkw(
mWakeup_);
400 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
456 <<
" but according to BU JSON there should be " 461 std::unique_lock<std::mutex> lkw(
mWakeup_);
484 "Premature end of input file while reading event header";
504 if (detectedFRDversion_==0) {
505 detectedFRDversion_=*((
uint32*)dataPosition);
506 if (detectedFRDversion_>5)
509 assert(detectedFRDversion_>=1);
517 "Premature end of input file while reading event header";
524 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
525 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
526 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
534 "Premature end of input file while reading event data";
560 unsigned char *dataPosition;
568 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
569 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
570 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
578 "Premature end of input file while reading event data";
612 if ( crc !=
event_->crc32c() ) {
615 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
616 " but calculated 0x" << crc;
621 uint32_t adler = adler32(0
L,Z_NULL,0);
622 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
624 if ( adler !=
event_->adler32() ) {
627 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
628 " but calculated 0x" << adler;
649 catch (
const boost::filesystem::filesystem_error& ex)
651 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652 <<
". Trying again.";
657 catch (
const boost::filesystem::filesystem_error&) {}
661 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
662 <<
". Trying again.";
723 bool fileIsBeingProcessed =
false;
726 fileIsBeingProcessed =
true;
730 if (!fileIsBeingProcessed) {
749 gettimeofday(&stv,
nullptr);
751 time = (time << 32) + stv.tv_usec;
754 uint32_t eventSize =
event_->eventSize();
755 char*
event = (
char*)
event_->payload();
758 while (eventSize > 0) {
759 assert(eventSize>=
sizeof(
fedt_t));
760 eventSize -=
sizeof(
fedt_t);
763 assert(eventSize>=fedSize -
sizeof(
fedt_t));
764 eventSize -= (fedSize -
sizeof(
fedt_t));
769 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
782 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
792 memcpy(fedData.
data(),
event + eventSize, fedSize);
794 assert(eventSize == 0);
807 std::ostringstream fileNameWithPID;
808 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" 809 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
810 jsonDestPath /= fileNameWithPID.str();
812 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to " 817 catch (
const boost::filesystem::filesystem_error& ex)
820 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
831 catch (
const boost::filesystem::filesystem_error& ex)
834 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
839 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
842 boost::filesystem::ifstream ij(jsonDestPath);
846 std::stringstream ss;
848 if (!reader.
parse(ss.str(), deserializeRoot)) {
849 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
851 <<
"CONTENT:\n" << ss.str()<<
".";
852 throw std::runtime_error(
"Cannot deserialize input JSON file");
871 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
872 " error reading number of events from BU JSON -: No input value " <<
data;
874 return boost::lexical_cast<
int>(
data);
877 catch (
const boost::filesystem::filesystem_error& ex)
881 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
883 catch (std::runtime_error
e)
887 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
890 catch( boost::bad_lexical_cast
const& ) {
891 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 892 <<
"Input value is -: " <<
data;
899 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
912 unsigned int currentLumiSection = 0;
922 uint32_t lockCount=0;
933 bool copy_active=
false;
934 for (
auto j :
tid_active_)
if (j) copy_active=
true;
949 std::unique_lock<std::mutex> lkw(
mWakeup_);
954 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
985 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
995 if (thisLockWaitTimeUs>0.)
996 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1003 sumLockWaitTimeUs=0;
1024 currentLumiSection =
ls;
1029 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1039 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1045 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1049 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1052 int stat_res =
stat(rawFile.c_str(),&st);
1054 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1058 fileSize=st.st_size;
1065 int eventsInNewFile;
1067 if (fileSize==0) eventsInNewFile=0;
1068 else eventsInNewFile=-1;
1072 assert( eventsInNewFile>=0 );
1073 assert((eventsInNewFile>0) == (fileSize>0));
1081 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1085 for (
unsigned int i=0;
i<neededChunks;
i++) {
1088 bool copy_active=
false;
1089 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1096 unsigned int newTid = 0xffffffff;
1102 bool copy_active=
false;
1103 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1115 if (newChunk ==
nullptr) {
1123 std::unique_lock<std::mutex> lk(
mReader_);
1126 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1127 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1137 if (!eventsInNewFile) {
1139 std::unique_lock<std::mutex> lkw(
mWakeup_);
1140 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1149 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1151 std::unique_lock<std::mutex> lkw(
mWakeup_);
1155 newChunk->
reset(0,toRead,0);
1159 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1160 newInputFile->
chunks_[0]=newChunk;
1169 unsigned numFinishedThreads = 0;
1172 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1173 std::unique_lock<std::mutex> lk(
mReader_);
1176 numFinishedThreads++;
1187 threadInit_.exchange(
true,std::memory_order_acquire);
1192 std::unique_lock<std::mutex> lk(
mReader_);
1217 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1218 off_t
pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1221 if (fileDescriptor>=0)
1222 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1226 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1227 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1233 unsigned int bufferLeft = 0;
1247 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1248 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1249 close(fileDescriptor);
1262 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1270 while (!waitForChunk(currentChunk_)) {
1272 if (parent_->exceptionState()) parent_->threadError();
1275 dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1276 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1278 if (currentLeft < size) {
1281 while (!waitForChunk(currentChunk_+1)) {
1283 if (parent_->exceptionState()) parent_->threadError();
1286 dataPosition-=chunkPosition_;
1287 assert(dataPosition==chunks_[currentChunk_]->buf_);
1288 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1289 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1291 bufferPosition_+=
size;
1292 chunkPosition_=size-currentLeft;
1297 chunkPosition_+=
size;
1298 bufferPosition_+=
size;
1306 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1307 assert(size - offset < chunks_[currentChunk_]->size_);
1308 memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1309 chunkPosition_+=
size;
1310 bufferPosition_+=
size;
1314 chunkPosition_-=
size;
1315 bufferPosition_-=
size;
1327 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1330 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1336 uint32_t existingSize = 0;
1351 for (uint32_t
i=0;
i<blockcount;
i++) {
1365 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1379 itr->second+=events;
1389 auto && ret = std::pair<bool,unsigned int>(
true,itr->second);
1395 return std::pair<bool,unsigned int>(
false,0);
1402 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1403 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1410 auto end = fileStem.find(
"_");
1411 if (fileStem.find(
"run")==0) {
1415 long rval = boost::lexical_cast<
long>(runStr);
1416 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1419 catch( boost::bad_lexical_cast
const& ) {
1420 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1431 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1432 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1435 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1436 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1439 ls = boost::lexical_cast<
unsigned int>(fileStem);
1455 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
#define FED_EVSZ_EXTRACT(a)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
StreamID streamID() const
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
unsigned long long TimeValue_t
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
def getRunNumber(filename)
unsigned long long uint64_t
void deserialize(Json::Value &root) override
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const