16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 fileNames_(pset.getUntrackedParameter<std::vector<std::
string>> (
"fileNames",std::vector<std::
string>())),
67 fileListMode_(pset.getUntrackedParameter<bool> (
"fileListMode",
false)),
68 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
69 fuOutputDir_(std::
string()),
73 currentLumiSection_(0),
79 gethostname(thishost, 255);
80 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
82 <<
" MB on host " << thishost;
84 long autoRunNumber = -1;
88 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
100 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
103 if (stat(
defPath_.c_str(), &statbuf)) {
105 if (stat(
defPath_.c_str(), &statbuf)) {
112 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
121 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
122 "no reading enabled with numBuffers parameter 0";
129 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
140 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
146 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
167 cvReader_.push_back(
new std::condition_variable);
168 threadInit_.store(
false,std::memory_order_release);
191 std::unique_lock<std::mutex> lk(
mReader_);
212 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
213 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
214 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
215 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
216 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
217 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
218 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
219 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
220 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
221 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
223 descriptions.
add(
"source", desc);
253 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
256 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
271 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
306 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
307 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
321 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
324 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
337 gettimeofday(&tv, 0);
338 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
343 lumiSection, lsopentime,
349 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
379 std::unique_lock<std::mutex> lkw(
mWakeup_);
392 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
447 <<
" but according to BU JSON there should be "
452 std::unique_lock<std::mutex> lkw(
mWakeup_);
475 "Premature end of input file while reading event header";
493 if (detectedFRDversion_==0) {
494 detectedFRDversion_=*((
uint32*)dataPosition);
495 if (detectedFRDversion_>5)
498 assert(detectedFRDversion_>=1);
506 "Premature end of input file while reading event header";
513 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
514 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
515 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
523 "Premature end of input file while reading event data";
547 unsigned char *dataPosition;
555 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
556 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
557 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
565 "Premature end of input file while reading event data";
598 if ( crc !=
event_->crc32c() ) {
601 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
602 " but calculated 0x" << crc;
607 uint32_t adler = adler32(0
L,Z_NULL,0);
608 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
610 if ( adler !=
event_->adler32() ) {
613 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
614 " but calculated 0x" << adler;
635 catch (
const boost::filesystem::filesystem_error& ex)
637 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
638 <<
". Trying again.";
643 catch (
const boost::filesystem::filesystem_error&) {}
647 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
648 <<
". Trying again.";
705 bool fileIsBeingProcessed =
false;
708 fileIsBeingProcessed =
true;
712 if (!fileIsBeingProcessed) {
730 gettimeofday(&stv,0);
732 time = (time << 32) + stv.tv_usec;
735 uint32_t eventSize =
event_->eventSize();
736 char*
event = (
char*)
event_->payload();
739 while (eventSize > 0) {
741 eventSize -=
sizeof(
fedt_t);
745 eventSize -= (fedSize -
sizeof(
fedt_t));
750 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
763 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
773 memcpy(fedData.
data(),
event + eventSize, fedSize);
788 std::ostringstream fileNameWithPID;
789 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
790 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
791 jsonDestPath /= fileNameWithPID.str();
793 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
798 catch (
const boost::filesystem::filesystem_error& ex)
801 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
812 catch (
const boost::filesystem::filesystem_error& ex)
815 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
820 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
823 boost::filesystem::ifstream ij(jsonDestPath);
827 if (!reader.
parse(ij, deserializeRoot))
828 throw std::runtime_error(
"Cannot deserialize input JSON file");
846 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
847 " error reading number of events from BU JSON -: No input value " <<
data;
849 return boost::lexical_cast<
int>(
data);
852 catch (
const boost::filesystem::filesystem_error& ex)
856 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
858 catch (std::runtime_error
e)
862 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
865 catch( boost::bad_lexical_cast
const& ) {
866 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
867 <<
"Input value is -: " <<
data;
874 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
885 InputSource::rewind();
898 unsigned int currentLumiSection = 0;
908 uint32_t lockCount=0;
917 std::unique_lock<std::mutex> lkw(
mWakeup_);
922 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
949 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
957 if (thisLockWaitTimeUs>0.)
958 sumLockWaitTimeUs+=thisLockWaitTimeUs;
984 currentLumiSection =
ls;
989 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
998 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1003 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1007 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1010 int stat_res = stat(rawFile.c_str(),&st);
1012 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1017 fileSize=st.st_size;
1020 int eventsInNewFile;
1022 if (fileSize==0) eventsInNewFile=0;
1023 else eventsInNewFile=-1;
1027 assert( eventsInNewFile>=0 );
1028 assert((eventsInNewFile>0) == (fileSize>0));
1042 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1046 for (
unsigned int i=0;
i<neededChunks;
i++) {
1049 unsigned int newTid = 0xffffffff;
1060 if (newChunk ==
nullptr) {
1067 std::unique_lock<std::mutex> lk(
mReader_);
1070 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1071 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1081 if (!eventsInNewFile) {
1083 std::unique_lock<std::mutex> lkw(
mWakeup_);
1084 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1093 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1095 std::unique_lock<std::mutex> lkw(
mWakeup_);
1099 newChunk->
reset(0,toRead,0);
1103 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1104 newInputFile->
chunks_[0]=newChunk;
1112 unsigned numFinishedThreads = 0;
1115 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1116 std::unique_lock<std::mutex> lk(
mReader_);
1119 numFinishedThreads++;
1130 threadInit_.exchange(
true,std::memory_order_acquire);
1134 std::unique_lock<std::mutex> lk(
mReader_);
1158 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1159 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1162 if (fileDescriptor>=0)
1163 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1167 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1168 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1174 unsigned int bufferLeft = 0;
1188 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1189 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1190 close(fileDescriptor);
1203 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1219 if (currentLeft < size) {
1268 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1271 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1277 uint32_t existingSize = 0;
1292 for (uint32_t
i=0;
i<blockcount;
i++) {
1306 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1320 itr->second+=events;
1330 auto &&
ret = std::pair<bool,unsigned int>(
true,itr->second);
1336 return std::pair<bool,unsigned int>(
false,0);
1343 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1344 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1351 auto end = fileStem.find(
"_");
1352 if (fileStem.find(
"run")==0) {
1356 long rval = boost::lexical_cast<
long>(runStr);
1357 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1360 catch( boost::bad_lexical_cast
const& ) {
1361 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1372 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1373 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1376 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1377 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1378 ls = boost::lexical_cast<
unsigned int>(fileStem);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const