15 #include <boost/algorithm/string.hpp>
16 #include <boost/filesystem/fstream.hpp>
48 #include <boost/lexical_cast.hpp>
50 using namespace jsoncollector;
54 edm::RawInputSource(pset, desc),
55 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
56 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
57 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
58 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
59 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
60 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
61 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
62 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
68 currentLumiSection_(0),
74 gethostname(thishost, 255);
75 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
77 <<
" MB on host " << thishost;
79 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
88 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
97 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
98 "no reading enabled with numBuffers parameter 0";
108 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
119 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
136 cvReader_.push_back(
new std::condition_variable);
137 threadInit_.store(
false,std::memory_order_release);
160 std::unique_lock<std::mutex> lk(
mReader_);
198 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
201 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
216 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
255 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
258 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
269 gettimeofday(&tv, 0);
270 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
275 lumiSection, lsopentime,
281 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
297 const size_t headerSize[4] = {0,2*
sizeof(
uint32),(4 + 1024) *
sizeof(
uint32),7*
sizeof(
uint32)};
312 std::unique_lock<std::mutex> lkw(
mWakeup_);
313 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
375 <<
" but according to BU JSON there should be "
380 std::unique_lock<std::mutex> lkw(
mWakeup_);
403 "Premature end of input file while reading event header";
421 if (detectedFRDversion_==0) {
422 detectedFRDversion_=*((
uint32*)dataPosition);
423 assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
431 "Premature end of input file while reading event header";
438 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
439 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
440 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
448 "Premature end of input file while reading event data";
472 unsigned char *dataPosition;
480 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
481 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
482 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
490 "Premature end of input file while reading event data";
521 uint32_t adler = adler32(0
L,Z_NULL,0);
522 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
524 if ( adler !=
event_->adler32() ) {
526 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
527 " but calculated 0x" << adler;
544 catch (
const boost::filesystem::filesystem_error& ex)
546 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
547 <<
". Trying again.";
556 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
557 <<
". Trying again.";
617 bool fileIsBeingProcessed =
false;
620 fileIsBeingProcessed =
true;
624 if (!fileIsBeingProcessed) {
641 uint32_t eventSize =
event_->eventSize();
642 char*
event = (
char*)
event_->payload();
645 while (eventSize > 0) {
646 eventSize -=
sizeof(
fedt_t);
649 eventSize -= (fedSize -
sizeof(
fedh_t));
663 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
671 FEDRawData& fedData = rawData->FEDData(fedId);
673 memcpy(fedData.
data(),
event + eventSize, fedSize);
675 assert(eventSize == 0);
688 std::ostringstream fileNameWithPID;
689 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
690 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
691 jsonDestPath /= fileNameWithPID.str();
693 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
702 catch (
const boost::filesystem::filesystem_error& ex)
705 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
717 catch (
const boost::filesystem::filesystem_error& ex)
720 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
725 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
730 boost::filesystem::ifstream ij(jsonDestPath);
734 if (!reader.
parse(ij, deserializeRoot))
735 throw std::runtime_error(
"Cannot deserialize input JSON file");
753 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
754 " error reading number of events from BU JSON -: No input value " <<
data;
756 return boost::lexical_cast<
int>(
data);
759 catch (
const boost::filesystem::filesystem_error& ex)
763 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
764 <<
" - Maybe the BU run dir disappeared? Ending process with code 0...";
767 catch (std::runtime_error
e)
771 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - runtime Exception -: " << e.what();
774 catch( boost::bad_lexical_cast
const& ) {
775 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - error parsing number of events from BU JSON. "
776 <<
"Input value is -: " <<
data;
783 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
794 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
797 boost::filesystem::rename(source.replace_extension(
".jsn"),
destination.replace_extension(
".jsn"));
805 InputSource::rewind();
818 unsigned int currentLumiSection = 0;
832 std::unique_lock<std::mutex> lkw(
mWakeup_);
834 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
837 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
872 currentLumiSection =
ls;
877 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
884 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
889 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
893 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
896 stat(rawFile.c_str(),&st);
901 assert( eventsInNewFile>=0 );
902 assert((eventsInNewFile>0) == (fileSize>0));
909 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
912 for (
unsigned int i=0;
i<neededChunks;
i++) {
915 unsigned int newTid = 0xffffffff;
926 if (newChunk ==
nullptr) {
933 std::unique_lock<std::mutex> lk(
mReader_);
936 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
937 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
947 if (!eventsInNewFile) {
949 std::unique_lock<std::mutex> lkw(
mWakeup_);
950 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
958 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
960 std::unique_lock<std::mutex> lkw(
mWakeup_);
964 newChunk->
reset(0,toRead,0);
968 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
969 newInputFile->
chunks_[0]=newChunk;
976 unsigned numFinishedThreads = 0;
980 std::unique_lock<std::mutex> lk(
mReader_);
983 numFinishedThreads++;
994 threadInit_.exchange(
true,std::memory_order_acquire);
998 std::unique_lock<std::mutex> lk(
mReader_);
1022 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1023 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1026 if (fileDescriptor>=0)
1027 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1031 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1032 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1038 unsigned int bufferLeft = 0;
1052 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1053 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1054 close(fileDescriptor);
1067 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1083 if (currentLeft < size) {
1132 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1135 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1141 uint32_t existingSize = 0;
1156 for (uint32_t
i=0;
i<blockcount;
i++) {
1171 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)