16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
61 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
62 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
63 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
64 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
65 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
71 currentLumiSection_(0),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
80 <<
" MB on host " << thishost;
82 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
91 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
100 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
101 "no reading enabled with numBuffers parameter 0";
107 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
114 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
125 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
142 cvReader_.push_back(
new std::condition_variable);
143 threadInit_.store(
false,std::memory_order_release);
166 std::unique_lock<std::mutex> lk(
mReader_);
204 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
207 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
222 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
257 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
258 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
272 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
275 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
288 gettimeofday(&tv, 0);
289 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
294 lumiSection, lsopentime,
300 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
330 std::unique_lock<std::mutex> lkw(
mWakeup_);
331 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
343 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
396 <<
" but according to BU JSON there should be "
401 std::unique_lock<std::mutex> lkw(
mWakeup_);
424 "Premature end of input file while reading event header";
442 if (detectedFRDversion_==0) {
443 detectedFRDversion_=*((
uint32*)dataPosition);
444 if (detectedFRDversion_>5)
447 assert(detectedFRDversion_>=1);
455 "Premature end of input file while reading event header";
462 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
463 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
464 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
472 "Premature end of input file while reading event data";
496 unsigned char *dataPosition;
504 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
505 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
506 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
514 "Premature end of input file while reading event data";
547 if ( crc !=
event_->crc32c() ) {
550 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
551 " but calculated 0x" << crc;
556 uint32_t adler = adler32(0
L,Z_NULL,0);
557 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
559 if ( adler !=
event_->adler32() ) {
562 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
563 " but calculated 0x" << adler;
582 catch (
const boost::filesystem::filesystem_error& ex)
584 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
585 <<
". Trying again.";
594 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
595 <<
". Trying again.";
655 bool fileIsBeingProcessed =
false;
658 fileIsBeingProcessed =
true;
662 if (!fileIsBeingProcessed) {
680 gettimeofday(&stv,0);
682 time = (time << 32) + stv.tv_usec;
685 uint32_t eventSize =
event_->eventSize();
686 char*
event = (
char*)
event_->payload();
689 while (eventSize > 0) {
691 eventSize -=
sizeof(
fedt_t);
695 eventSize -= (fedSize -
sizeof(
fedt_t));
700 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
713 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
723 memcpy(fedData.
data(),
event + eventSize, fedSize);
738 std::ostringstream fileNameWithPID;
739 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
740 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
741 jsonDestPath /= fileNameWithPID.str();
743 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
752 catch (
const boost::filesystem::filesystem_error& ex)
755 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
767 catch (
const boost::filesystem::filesystem_error& ex)
770 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
775 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
780 boost::filesystem::ifstream ij(jsonDestPath);
784 if (!reader.
parse(ij, deserializeRoot))
785 throw std::runtime_error(
"Cannot deserialize input JSON file");
803 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
804 " error reading number of events from BU JSON -: No input value " <<
data;
806 return boost::lexical_cast<
int>(
data);
809 catch (
const boost::filesystem::filesystem_error& ex)
813 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
815 catch (std::runtime_error
e)
819 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
822 catch( boost::bad_lexical_cast
const& ) {
823 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
824 <<
"Input value is -: " <<
data;
831 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
842 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
843 <<
" to: " << destination.string();
844 boost::filesystem::rename(source,destination);
845 boost::filesystem::rename(source.replace_extension(
".jsn"),destination.replace_extension(
".jsn"));
853 InputSource::rewind();
866 unsigned int currentLumiSection = 0;
880 std::unique_lock<std::mutex> lkw(
mWakeup_);
882 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
885 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
901 uint32_t lockCount=0;
918 if (thisLockWaitTimeUs>0.)
919 sumLockWaitTimeUs+=thisLockWaitTimeUs;
944 currentLumiSection =
ls;
949 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
958 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
963 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
967 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
970 stat(rawFile.c_str(),&st);
975 assert( eventsInNewFile>=0 );
976 assert((eventsInNewFile>0) == (fileSize>0));
983 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
986 for (
unsigned int i=0;
i<neededChunks;
i++) {
989 unsigned int newTid = 0xffffffff;
1000 if (newChunk ==
nullptr) {
1007 std::unique_lock<std::mutex> lk(
mReader_);
1010 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1011 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1021 if (!eventsInNewFile) {
1023 std::unique_lock<std::mutex> lkw(
mWakeup_);
1024 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1032 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1034 std::unique_lock<std::mutex> lkw(
mWakeup_);
1038 newChunk->
reset(0,toRead,0);
1042 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1043 newInputFile->
chunks_[0]=newChunk;
1050 unsigned numFinishedThreads = 0;
1053 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1054 std::unique_lock<std::mutex> lk(
mReader_);
1057 numFinishedThreads++;
1068 threadInit_.exchange(
true,std::memory_order_acquire);
1072 std::unique_lock<std::mutex> lk(
mReader_);
1096 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1097 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1100 if (fileDescriptor>=0)
1101 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1105 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1106 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1112 unsigned int bufferLeft = 0;
1126 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1127 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1128 close(fileDescriptor);
1141 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1157 if (currentLeft < size) {
1206 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1209 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1215 uint32_t existingSize = 0;
1230 for (uint32_t
i=0;
i<blockcount;
i++) {
1245 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const