16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
67 fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
71 currentLumiSection_(0),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
80 <<
" MB on host " << thishost;
87 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
90 if (stat(
defPath_.c_str(), &statbuf)) {
92 if (stat(
defPath_.c_str(), &statbuf)) {
99 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
108 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
109 "no reading enabled with numBuffers parameter 0";
116 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
121 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
126 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
146 cvReader_.push_back(
new std::condition_variable);
147 threadInit_.store(
false,std::memory_order_release);
170 std::unique_lock<std::mutex> lk(
mReader_);
191 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
192 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
193 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
194 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
195 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
196 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
197 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
198 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
200 descriptions.
add(
"source", desc);
227 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
230 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
245 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
280 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
281 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
295 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
298 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
311 gettimeofday(&tv, 0);
312 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
317 lumiSection, lsopentime,
323 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
353 std::unique_lock<std::mutex> lkw(
mWakeup_);
354 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
366 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
421 <<
" but according to BU JSON there should be "
426 std::unique_lock<std::mutex> lkw(
mWakeup_);
449 "Premature end of input file while reading event header";
467 if (detectedFRDversion_==0) {
468 detectedFRDversion_=*((
uint32*)dataPosition);
469 if (detectedFRDversion_>5)
472 assert(detectedFRDversion_>=1);
480 "Premature end of input file while reading event header";
487 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
488 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
489 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
497 "Premature end of input file while reading event data";
521 unsigned char *dataPosition;
529 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
530 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
531 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
539 "Premature end of input file while reading event data";
572 if ( crc !=
event_->crc32c() ) {
575 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
576 " but calculated 0x" << crc;
581 uint32_t adler = adler32(0
L,Z_NULL,0);
582 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
584 if ( adler !=
event_->adler32() ) {
587 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
588 " but calculated 0x" << adler;
606 catch (
const boost::filesystem::filesystem_error& ex)
608 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
609 <<
". Trying again.";
614 catch (
const boost::filesystem::filesystem_error&) {}
618 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
619 <<
". Trying again.";
676 bool fileIsBeingProcessed =
false;
679 fileIsBeingProcessed =
true;
683 if (!fileIsBeingProcessed) {
701 gettimeofday(&stv,0);
703 time = (time << 32) + stv.tv_usec;
706 uint32_t eventSize =
event_->eventSize();
707 char*
event = (
char*)
event_->payload();
710 while (eventSize > 0) {
712 eventSize -=
sizeof(
fedt_t);
716 eventSize -= (fedSize -
sizeof(
fedt_t));
721 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
734 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
744 memcpy(fedData.
data(),
event + eventSize, fedSize);
759 std::ostringstream fileNameWithPID;
760 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
761 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
762 jsonDestPath /= fileNameWithPID.str();
764 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
769 catch (
const boost::filesystem::filesystem_error& ex)
772 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
783 catch (
const boost::filesystem::filesystem_error& ex)
786 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
791 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
794 boost::filesystem::ifstream ij(jsonDestPath);
798 if (!reader.
parse(ij, deserializeRoot))
799 throw std::runtime_error(
"Cannot deserialize input JSON file");
817 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
818 " error reading number of events from BU JSON -: No input value " <<
data;
820 return boost::lexical_cast<
int>(
data);
823 catch (
const boost::filesystem::filesystem_error& ex)
827 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
829 catch (std::runtime_error
e)
833 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
836 catch( boost::bad_lexical_cast
const& ) {
837 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
838 <<
"Input value is -: " <<
data;
845 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
856 InputSource::rewind();
869 unsigned int currentLumiSection = 0;
883 std::unique_lock<std::mutex> lkw(
mWakeup_);
885 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
888 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
904 uint32_t lockCount=0;
922 if (thisLockWaitTimeUs>0.)
923 sumLockWaitTimeUs+=thisLockWaitTimeUs;
949 currentLumiSection =
ls;
954 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
963 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
968 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
972 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
975 stat(rawFile.c_str(),&st);
980 assert( eventsInNewFile>=0 );
981 assert((eventsInNewFile>0) == (fileSize>0));
988 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
992 for (
unsigned int i=0;
i<neededChunks;
i++) {
995 unsigned int newTid = 0xffffffff;
1006 if (newChunk ==
nullptr) {
1013 std::unique_lock<std::mutex> lk(
mReader_);
1016 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1017 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1027 if (!eventsInNewFile) {
1029 std::unique_lock<std::mutex> lkw(
mWakeup_);
1030 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1039 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1041 std::unique_lock<std::mutex> lkw(
mWakeup_);
1045 newChunk->
reset(0,toRead,0);
1049 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1050 newInputFile->
chunks_[0]=newChunk;
1058 unsigned numFinishedThreads = 0;
1061 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1062 std::unique_lock<std::mutex> lk(
mReader_);
1065 numFinishedThreads++;
1076 threadInit_.exchange(
true,std::memory_order_acquire);
1080 std::unique_lock<std::mutex> lk(
mReader_);
1104 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1105 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1108 if (fileDescriptor>=0)
1109 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1113 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1114 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1120 unsigned int bufferLeft = 0;
1134 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1135 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1136 close(fileDescriptor);
1149 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1165 if (currentLeft < size) {
1214 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1217 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1223 uint32_t existingSize = 0;
1238 for (uint32_t
i=0;
i<blockcount;
i++) {
1252 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1266 itr->second+=events;
1276 auto &&
ret = std::pair<bool,unsigned int>(
true,itr->second);
1282 return std::pair<bool,unsigned int>(
false,0);
static const char runNumber_[]
std::vector< std::string > & getData()
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const