16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
67 fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
71 currentLumiSection_(0),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
80 <<
" MB on host " << thishost;
87 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
90 if (stat(
defPath_.c_str(), &statbuf)) {
92 if (stat(
defPath_.c_str(), &statbuf)) {
99 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
108 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
109 "no reading enabled with numBuffers parameter 0";
116 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
123 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
133 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
150 cvReader_.push_back(
new std::condition_variable);
151 threadInit_.store(
false,std::memory_order_release);
174 std::unique_lock<std::mutex> lk(
mReader_);
195 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
196 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
197 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
198 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
199 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
200 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
201 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
202 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
204 descriptions.
add(
"source", desc);
231 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
234 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
249 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
284 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
285 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
299 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
302 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
315 gettimeofday(&tv, 0);
316 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
321 lumiSection, lsopentime,
327 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
357 std::unique_lock<std::mutex> lkw(
mWakeup_);
358 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
370 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
425 <<
" but according to BU JSON there should be "
430 std::unique_lock<std::mutex> lkw(
mWakeup_);
453 "Premature end of input file while reading event header";
471 if (detectedFRDversion_==0) {
472 detectedFRDversion_=*((
uint32*)dataPosition);
473 if (detectedFRDversion_>5)
476 assert(detectedFRDversion_>=1);
484 "Premature end of input file while reading event header";
491 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
492 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
493 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
501 "Premature end of input file while reading event data";
525 unsigned char *dataPosition;
533 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
534 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
535 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
543 "Premature end of input file while reading event data";
576 if ( crc !=
event_->crc32c() ) {
579 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
580 " but calculated 0x" << crc;
585 uint32_t adler = adler32(0
L,Z_NULL,0);
586 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
588 if ( adler !=
event_->adler32() ) {
591 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
592 " but calculated 0x" << adler;
610 catch (
const boost::filesystem::filesystem_error& ex)
612 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
613 <<
". Trying again.";
622 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
623 <<
". Trying again.";
680 bool fileIsBeingProcessed =
false;
683 fileIsBeingProcessed =
true;
687 if (!fileIsBeingProcessed) {
705 gettimeofday(&stv,0);
707 time = (time << 32) + stv.tv_usec;
710 uint32_t eventSize =
event_->eventSize();
711 char*
event = (
char*)
event_->payload();
714 while (eventSize > 0) {
716 eventSize -=
sizeof(
fedt_t);
720 eventSize -= (fedSize -
sizeof(
fedt_t));
725 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
738 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
748 memcpy(fedData.
data(),
event + eventSize, fedSize);
763 std::ostringstream fileNameWithPID;
764 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
765 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
766 jsonDestPath /= fileNameWithPID.str();
768 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
773 catch (
const boost::filesystem::filesystem_error& ex)
776 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
787 catch (
const boost::filesystem::filesystem_error& ex)
790 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
795 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
798 boost::filesystem::ifstream ij(jsonDestPath);
802 if (!reader.
parse(ij, deserializeRoot))
803 throw std::runtime_error(
"Cannot deserialize input JSON file");
821 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
822 " error reading number of events from BU JSON -: No input value " <<
data;
824 return boost::lexical_cast<
int>(
data);
827 catch (
const boost::filesystem::filesystem_error& ex)
831 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
833 catch (std::runtime_error
e)
837 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
840 catch( boost::bad_lexical_cast
const& ) {
841 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
842 <<
"Input value is -: " <<
data;
849 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
860 InputSource::rewind();
873 unsigned int currentLumiSection = 0;
887 std::unique_lock<std::mutex> lkw(
mWakeup_);
889 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
892 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
908 uint32_t lockCount=0;
926 if (thisLockWaitTimeUs>0.)
927 sumLockWaitTimeUs+=thisLockWaitTimeUs;
953 currentLumiSection =
ls;
958 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
967 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
972 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
976 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
979 stat(rawFile.c_str(),&st);
984 assert( eventsInNewFile>=0 );
985 assert((eventsInNewFile>0) == (fileSize>0));
992 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
996 for (
unsigned int i=0;
i<neededChunks;
i++) {
999 unsigned int newTid = 0xffffffff;
1010 if (newChunk ==
nullptr) {
1017 std::unique_lock<std::mutex> lk(
mReader_);
1020 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1021 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1031 if (!eventsInNewFile) {
1033 std::unique_lock<std::mutex> lkw(
mWakeup_);
1034 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1043 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1045 std::unique_lock<std::mutex> lkw(
mWakeup_);
1049 newChunk->
reset(0,toRead,0);
1053 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1054 newInputFile->
chunks_[0]=newChunk;
1062 unsigned numFinishedThreads = 0;
1065 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1066 std::unique_lock<std::mutex> lk(
mReader_);
1069 numFinishedThreads++;
1080 threadInit_.exchange(
true,std::memory_order_acquire);
1084 std::unique_lock<std::mutex> lk(
mReader_);
1108 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1109 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1112 if (fileDescriptor>=0)
1113 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1117 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1118 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1124 unsigned int bufferLeft = 0;
1138 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1139 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1140 close(fileDescriptor);
1153 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1169 if (currentLeft < size) {
1218 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1221 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1227 uint32_t existingSize = 0;
1242 for (uint32_t
i=0;
i<blockcount;
i++) {
1257 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const