16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
61 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
62 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
63 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
64 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
65 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
71 currentLumiSection_(0),
77 gethostname(thishost, 255);
78 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
80 <<
" MB on host " << thishost;
82 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
91 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
100 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
101 "no reading enabled with numBuffers parameter 0";
107 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
114 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
125 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
142 cvReader_.push_back(
new std::condition_variable);
143 threadInit_.store(
false,std::memory_order_release);
166 std::unique_lock<std::mutex> lk(
mReader_);
204 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
207 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
222 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
257 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
258 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
272 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
275 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
288 gettimeofday(&tv, 0);
289 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
294 lumiSection, lsopentime,
300 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
330 std::unique_lock<std::mutex> lkw(
mWakeup_);
331 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
343 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
396 <<
" but according to BU JSON there should be "
401 std::unique_lock<std::mutex> lkw(
mWakeup_);
424 "Premature end of input file while reading event header";
442 if (detectedFRDversion_==0) {
443 detectedFRDversion_=*((
uint32*)dataPosition);
444 if (detectedFRDversion_>5)
447 assert(detectedFRDversion_>=1);
455 "Premature end of input file while reading event header";
462 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
463 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
464 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
472 "Premature end of input file while reading event data";
496 unsigned char *dataPosition;
504 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
505 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
506 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
514 "Premature end of input file while reading event data";
547 if ( crc !=
event_->crc32c() ) {
549 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
550 " but calculated 0x" << crc;
555 uint32_t adler = adler32(0
L,Z_NULL,0);
556 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
558 if ( adler !=
event_->adler32() ) {
560 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
561 " but calculated 0x" << adler;
580 catch (
const boost::filesystem::filesystem_error& ex)
582 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
583 <<
". Trying again.";
592 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
593 <<
". Trying again.";
653 bool fileIsBeingProcessed =
false;
656 fileIsBeingProcessed =
true;
660 if (!fileIsBeingProcessed) {
678 gettimeofday(&stv,0);
680 time = (time << 32) + stv.tv_usec;
683 uint32_t eventSize =
event_->eventSize();
684 char*
event = (
char*)
event_->payload();
687 while (eventSize > 0) {
689 eventSize -=
sizeof(
fedt_t);
693 eventSize -= (fedSize -
sizeof(
fedt_t));
698 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
711 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
721 memcpy(fedData.
data(),
event + eventSize, fedSize);
736 std::ostringstream fileNameWithPID;
737 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
738 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
739 jsonDestPath /= fileNameWithPID.str();
741 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
750 catch (
const boost::filesystem::filesystem_error& ex)
753 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
765 catch (
const boost::filesystem::filesystem_error& ex)
768 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
773 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
778 boost::filesystem::ifstream ij(jsonDestPath);
782 if (!reader.
parse(ij, deserializeRoot))
783 throw std::runtime_error(
"Cannot deserialize input JSON file");
801 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
802 " error reading number of events from BU JSON -: No input value " <<
data;
804 return boost::lexical_cast<
int>(
data);
807 catch (
const boost::filesystem::filesystem_error& ex)
811 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
813 catch (std::runtime_error
e)
817 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
820 catch( boost::bad_lexical_cast
const& ) {
821 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
822 <<
"Input value is -: " <<
data;
829 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
840 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
843 boost::filesystem::rename(source.replace_extension(
".jsn"),
destination.replace_extension(
".jsn"));
851 InputSource::rewind();
864 unsigned int currentLumiSection = 0;
878 std::unique_lock<std::mutex> lkw(
mWakeup_);
880 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
883 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
925 currentLumiSection =
ls;
930 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
939 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
944 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
948 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
951 stat(rawFile.c_str(),&st);
956 assert( eventsInNewFile>=0 );
957 assert((eventsInNewFile>0) == (fileSize>0));
964 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
967 for (
unsigned int i=0;
i<neededChunks;
i++) {
970 unsigned int newTid = 0xffffffff;
981 if (newChunk ==
nullptr) {
988 std::unique_lock<std::mutex> lk(
mReader_);
991 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
992 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1002 if (!eventsInNewFile) {
1004 std::unique_lock<std::mutex> lkw(
mWakeup_);
1005 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1013 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1015 std::unique_lock<std::mutex> lkw(
mWakeup_);
1019 newChunk->
reset(0,toRead,0);
1023 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1024 newInputFile->
chunks_[0]=newChunk;
1031 unsigned numFinishedThreads = 0;
1034 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1035 std::unique_lock<std::mutex> lk(
mReader_);
1038 numFinishedThreads++;
1049 threadInit_.exchange(
true,std::memory_order_acquire);
1053 std::unique_lock<std::mutex> lk(
mReader_);
1077 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1078 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1081 if (fileDescriptor>=0)
1082 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1086 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1087 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1093 unsigned int bufferLeft = 0;
1107 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1108 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1109 close(fileDescriptor);
1122 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1138 if (currentLeft < size) {
1187 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1190 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1196 uint32_t existingSize = 0;
1211 for (uint32_t
i=0;
i<blockcount;
i++) {
1226 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const