16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
61 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
62 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
63 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
64 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
70 currentLumiSection_(0),
76 gethostname(thishost, 255);
77 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
79 <<
" MB on host " << thishost;
81 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
90 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
99 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
100 "no reading enabled with numBuffers parameter 0";
110 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
121 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
138 cvReader_.push_back(
new std::condition_variable);
139 threadInit_.store(
false,std::memory_order_release);
162 std::unique_lock<std::mutex> lk(
mReader_);
200 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
203 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
218 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
253 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
254 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
268 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
271 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
284 gettimeofday(&tv, 0);
285 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
290 lumiSection, lsopentime,
296 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
312 const size_t headerSize[4] = {0,2*
sizeof(
uint32),(4 + 1024) *
sizeof(
uint32),7*
sizeof(
uint32)};
327 std::unique_lock<std::mutex> lkw(
mWakeup_);
328 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
340 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
393 <<
" but according to BU JSON there should be "
398 std::unique_lock<std::mutex> lkw(
mWakeup_);
421 "Premature end of input file while reading event header";
439 if (detectedFRDversion_==0) {
440 detectedFRDversion_=*((
uint32*)dataPosition);
441 assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
449 "Premature end of input file while reading event header";
456 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
457 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
458 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
466 "Premature end of input file while reading event data";
490 unsigned char *dataPosition;
498 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
499 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
500 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
508 "Premature end of input file while reading event data";
539 uint32_t adler = adler32(0
L,Z_NULL,0);
540 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
542 if ( adler !=
event_->adler32() ) {
544 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
545 " but calculated 0x" << adler;
562 catch (
const boost::filesystem::filesystem_error& ex)
564 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
565 <<
". Trying again.";
574 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
575 <<
". Trying again.";
635 bool fileIsBeingProcessed =
false;
638 fileIsBeingProcessed =
true;
642 if (!fileIsBeingProcessed) {
660 gettimeofday(&stv,0);
662 time = (time << 32) + stv.tv_usec;
665 uint32_t eventSize =
event_->eventSize();
666 char*
event = (
char*)
event_->payload();
669 while (eventSize > 0) {
670 eventSize -=
sizeof(
fedt_t);
673 eventSize -= (fedSize -
sizeof(
fedh_t));
687 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
697 memcpy(fedData.
data(),
event + eventSize, fedSize);
712 std::ostringstream fileNameWithPID;
713 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
714 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
715 jsonDestPath /= fileNameWithPID.str();
717 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
726 catch (
const boost::filesystem::filesystem_error& ex)
729 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
741 catch (
const boost::filesystem::filesystem_error& ex)
744 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
749 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
754 boost::filesystem::ifstream ij(jsonDestPath);
758 if (!reader.
parse(ij, deserializeRoot))
759 throw std::runtime_error(
"Cannot deserialize input JSON file");
777 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
778 " error reading number of events from BU JSON -: No input value " <<
data;
780 return boost::lexical_cast<
int>(
data);
783 catch (
const boost::filesystem::filesystem_error& ex)
787 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
789 catch (std::runtime_error
e)
793 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
796 catch( boost::bad_lexical_cast
const& ) {
797 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
798 <<
"Input value is -: " <<
data;
805 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
816 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
819 boost::filesystem::rename(source.replace_extension(
".jsn"),
destination.replace_extension(
".jsn"));
827 InputSource::rewind();
840 unsigned int currentLumiSection = 0;
854 std::unique_lock<std::mutex> lkw(
mWakeup_);
856 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
859 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
901 currentLumiSection =
ls;
906 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
915 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
920 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
924 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
927 stat(rawFile.c_str(),&st);
932 assert( eventsInNewFile>=0 );
933 assert((eventsInNewFile>0) == (fileSize>0));
940 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
943 for (
unsigned int i=0;
i<neededChunks;
i++) {
946 unsigned int newTid = 0xffffffff;
957 if (newChunk ==
nullptr) {
964 std::unique_lock<std::mutex> lk(
mReader_);
967 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
968 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
978 if (!eventsInNewFile) {
980 std::unique_lock<std::mutex> lkw(
mWakeup_);
981 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
989 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
991 std::unique_lock<std::mutex> lkw(
mWakeup_);
995 newChunk->
reset(0,toRead,0);
999 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1000 newInputFile->
chunks_[0]=newChunk;
1007 unsigned numFinishedThreads = 0;
1010 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1011 std::unique_lock<std::mutex> lk(
mReader_);
1014 numFinishedThreads++;
1025 threadInit_.exchange(
true,std::memory_order_acquire);
1029 std::unique_lock<std::mutex> lk(
mReader_);
1053 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1054 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1057 if (fileDescriptor>=0)
1058 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1062 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1063 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1069 unsigned int bufferLeft = 0;
1083 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1084 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1085 close(fileDescriptor);
1098 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1114 if (currentLeft < size) {
1163 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1166 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1172 uint32_t existingSize = 0;
1187 for (uint32_t
i=0;
i<blockcount;
i++) {
1202 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const