16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
61 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
62 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
63 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
64 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
70 currentLumiSection_(0),
76 gethostname(thishost, 255);
77 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
79 <<
" MB on host " << thishost;
81 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
90 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
99 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
100 "no reading enabled with numBuffers parameter 0";
110 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
121 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
138 cvReader_.push_back(
new std::condition_variable);
139 threadInit_.store(
false,std::memory_order_release);
162 std::unique_lock<std::mutex> lk(
mReader_);
200 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
203 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
218 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
257 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
260 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
271 gettimeofday(&tv, 0);
272 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
277 lumiSection, lsopentime,
283 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
299 const size_t headerSize[4] = {0,2*
sizeof(
uint32),(4 + 1024) *
sizeof(
uint32),7*
sizeof(
uint32)};
314 std::unique_lock<std::mutex> lkw(
mWakeup_);
315 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
327 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
380 <<
" but according to BU JSON there should be "
385 std::unique_lock<std::mutex> lkw(
mWakeup_);
408 "Premature end of input file while reading event header";
426 if (detectedFRDversion_==0) {
427 detectedFRDversion_=*((
uint32*)dataPosition);
428 assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
436 "Premature end of input file while reading event header";
443 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
444 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
445 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
453 "Premature end of input file while reading event data";
477 unsigned char *dataPosition;
485 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
486 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
487 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
495 "Premature end of input file while reading event data";
526 uint32_t adler = adler32(0
L,Z_NULL,0);
527 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
529 if ( adler !=
event_->adler32() ) {
531 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
532 " but calculated 0x" << adler;
549 catch (
const boost::filesystem::filesystem_error& ex)
551 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
552 <<
". Trying again.";
561 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
562 <<
". Trying again.";
622 bool fileIsBeingProcessed =
false;
625 fileIsBeingProcessed =
true;
629 if (!fileIsBeingProcessed) {
647 gettimeofday(&stv,0);
649 time = (time << 32) + stv.tv_usec;
652 uint32_t eventSize =
event_->eventSize();
653 char*
event = (
char*)
event_->payload();
656 while (eventSize > 0) {
657 eventSize -=
sizeof(
fedt_t);
660 eventSize -= (fedSize -
sizeof(
fedh_t));
674 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
684 memcpy(fedData.
data(),
event + eventSize, fedSize);
686 assert(eventSize == 0);
699 std::ostringstream fileNameWithPID;
700 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
701 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
702 jsonDestPath /= fileNameWithPID.str();
704 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
713 catch (
const boost::filesystem::filesystem_error& ex)
716 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
728 catch (
const boost::filesystem::filesystem_error& ex)
731 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
736 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
741 boost::filesystem::ifstream ij(jsonDestPath);
745 if (!reader.
parse(ij, deserializeRoot))
746 throw std::runtime_error(
"Cannot deserialize input JSON file");
764 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
765 " error reading number of events from BU JSON -: No input value " <<
data;
767 return boost::lexical_cast<
int>(
data);
770 catch (
const boost::filesystem::filesystem_error& ex)
774 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
776 catch (std::runtime_error
e)
780 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
783 catch( boost::bad_lexical_cast
const& ) {
784 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
785 <<
"Input value is -: " <<
data;
792 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
803 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
806 boost::filesystem::rename(source.replace_extension(
".jsn"),
destination.replace_extension(
".jsn"));
814 InputSource::rewind();
827 unsigned int currentLumiSection = 0;
841 std::unique_lock<std::mutex> lkw(
mWakeup_);
843 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
846 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
881 currentLumiSection =
ls;
886 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
895 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
900 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
904 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
907 stat(rawFile.c_str(),&st);
912 assert( eventsInNewFile>=0 );
913 assert((eventsInNewFile>0) == (fileSize>0));
920 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
923 for (
unsigned int i=0;
i<neededChunks;
i++) {
926 unsigned int newTid = 0xffffffff;
937 if (newChunk ==
nullptr) {
944 std::unique_lock<std::mutex> lk(
mReader_);
947 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
948 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
958 if (!eventsInNewFile) {
960 std::unique_lock<std::mutex> lkw(
mWakeup_);
961 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
969 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
971 std::unique_lock<std::mutex> lkw(
mWakeup_);
975 newChunk->
reset(0,toRead,0);
979 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
980 newInputFile->
chunks_[0]=newChunk;
987 unsigned numFinishedThreads = 0;
991 std::unique_lock<std::mutex> lk(
mReader_);
994 numFinishedThreads++;
1005 threadInit_.exchange(
true,std::memory_order_acquire);
1009 std::unique_lock<std::mutex> lk(
mReader_);
1033 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1034 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1037 if (fileDescriptor>=0)
1038 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1042 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1043 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1049 unsigned int bufferLeft = 0;
1063 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1064 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1065 close(fileDescriptor);
1078 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1094 if (currentLeft < size) {
1143 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1146 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1152 uint32_t existingSize = 0;
1167 for (uint32_t
i=0;
i<blockcount;
i++) {
1182 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const