16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum", verifyAdler32_)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
71 currentLumiSection_(0),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
80 <<
" MB on host " << thishost;
89 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
98 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
99 "no reading enabled with numBuffers parameter 0";
106 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
113 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
123 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
140 cvReader_.push_back(
new std::condition_variable);
141 threadInit_.store(
false,std::memory_order_release);
164 std::unique_lock<std::mutex> lk(
mReader_);
206 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
209 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
224 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
259 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
260 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
274 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
277 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
290 gettimeofday(&tv, 0);
291 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
296 lumiSection, lsopentime,
302 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
332 std::unique_lock<std::mutex> lkw(
mWakeup_);
333 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
345 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
400 <<
" but according to BU JSON there should be "
405 std::unique_lock<std::mutex> lkw(
mWakeup_);
428 "Premature end of input file while reading event header";
446 if (detectedFRDversion_==0) {
447 detectedFRDversion_=*((
uint32*)dataPosition);
448 if (detectedFRDversion_>5)
451 assert(detectedFRDversion_>=1);
459 "Premature end of input file while reading event header";
466 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
467 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
468 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
476 "Premature end of input file while reading event data";
500 unsigned char *dataPosition;
508 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
509 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
510 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
518 "Premature end of input file while reading event data";
551 if ( crc !=
event_->crc32c() ) {
554 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
555 " but calculated 0x" << crc;
560 uint32_t adler = adler32(0
L,Z_NULL,0);
561 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
563 if ( adler !=
event_->adler32() ) {
566 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
567 " but calculated 0x" << adler;
585 catch (
const boost::filesystem::filesystem_error& ex)
587 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
588 <<
". Trying again.";
597 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
598 <<
". Trying again.";
655 bool fileIsBeingProcessed =
false;
658 fileIsBeingProcessed =
true;
662 if (!fileIsBeingProcessed) {
680 gettimeofday(&stv,0);
682 time = (time << 32) + stv.tv_usec;
685 uint32_t eventSize =
event_->eventSize();
686 char*
event = (
char*)
event_->payload();
689 while (eventSize > 0) {
691 eventSize -=
sizeof(
fedt_t);
695 eventSize -= (fedSize -
sizeof(
fedt_t));
700 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
713 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
723 memcpy(fedData.
data(),
event + eventSize, fedSize);
738 std::ostringstream fileNameWithPID;
739 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
740 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
741 jsonDestPath /= fileNameWithPID.str();
743 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
748 catch (
const boost::filesystem::filesystem_error& ex)
751 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
762 catch (
const boost::filesystem::filesystem_error& ex)
765 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
770 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
773 boost::filesystem::ifstream ij(jsonDestPath);
777 if (!reader.
parse(ij, deserializeRoot))
778 throw std::runtime_error(
"Cannot deserialize input JSON file");
796 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
797 " error reading number of events from BU JSON -: No input value " <<
data;
799 return boost::lexical_cast<
int>(
data);
802 catch (
const boost::filesystem::filesystem_error& ex)
806 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
808 catch (std::runtime_error
e)
812 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
815 catch( boost::bad_lexical_cast
const& ) {
816 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
817 <<
"Input value is -: " <<
data;
824 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
835 InputSource::rewind();
848 unsigned int currentLumiSection = 0;
862 std::unique_lock<std::mutex> lkw(
mWakeup_);
864 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
867 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
883 uint32_t lockCount=0;
901 if (thisLockWaitTimeUs>0.)
902 sumLockWaitTimeUs+=thisLockWaitTimeUs;
928 currentLumiSection =
ls;
933 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
942 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
947 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
951 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
954 stat(rawFile.c_str(),&st);
959 assert( eventsInNewFile>=0 );
960 assert((eventsInNewFile>0) == (fileSize>0));
967 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
971 for (
unsigned int i=0;
i<neededChunks;
i++) {
974 unsigned int newTid = 0xffffffff;
985 if (newChunk ==
nullptr) {
992 std::unique_lock<std::mutex> lk(
mReader_);
995 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
996 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1006 if (!eventsInNewFile) {
1008 std::unique_lock<std::mutex> lkw(
mWakeup_);
1009 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1018 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1020 std::unique_lock<std::mutex> lkw(
mWakeup_);
1024 newChunk->
reset(0,toRead,0);
1028 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1029 newInputFile->
chunks_[0]=newChunk;
1037 unsigned numFinishedThreads = 0;
1040 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1041 std::unique_lock<std::mutex> lk(
mReader_);
1044 numFinishedThreads++;
1055 threadInit_.exchange(
true,std::memory_order_acquire);
1059 std::unique_lock<std::mutex> lk(
mReader_);
1083 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1084 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1087 if (fileDescriptor>=0)
1088 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1092 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1093 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1099 unsigned int bufferLeft = 0;
1113 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1114 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1115 close(fileDescriptor);
1128 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1144 if (currentLeft < size) {
1193 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1196 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1202 uint32_t existingSize = 0;
1217 for (uint32_t
i=0;
i<blockcount;
i++) {
1232 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const