15 #include <boost/algorithm/string.hpp>
16 #include <boost/filesystem/fstream.hpp>
45 #include <boost/lexical_cast.hpp>
47 using namespace jsoncollector;
51 edm::RawInputSource(pset, desc),
52 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
53 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
54 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
55 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
56 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
57 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
58 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
59 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
65 currentLumiSection_(0),
70 gethostname(thishost, 255);
71 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
73 <<
" MB on host " << thishost;
75 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
84 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
93 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
94 "no reading enabled with numBuffers parameter 0";
104 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
115 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
132 cvReader_.push_back(
new std::condition_variable);
133 threadInit_.store(
false,std::memory_order_release);
156 std::unique_lock<std::mutex> lk(
mReader_);
194 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
197 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
212 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
251 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
254 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
265 gettimeofday(&tv, 0);
266 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
271 lumiSection, lsopentime,
277 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
293 const size_t headerSize[4] = {0,2*
sizeof(
uint32),(4 + 1024) *
sizeof(
uint32),7*
sizeof(
uint32)};
308 std::unique_lock<std::mutex> lkw(
mWakeup_);
309 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
371 <<
" but according to BU JSON there should be "
376 std::unique_lock<std::mutex> lkw(
mWakeup_);
399 "Premature end of input file while reading event header";
417 if (detectedFRDversion_==0) {
418 detectedFRDversion_=*((
uint32*)dataPosition);
419 assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
427 "Premature end of input file while reading event header";
434 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
435 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
436 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
444 "Premature end of input file while reading event data";
468 unsigned char *dataPosition;
476 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
477 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
478 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
486 "Premature end of input file while reading event data";
517 uint32_t adler = adler32(0
L,Z_NULL,0);
518 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
520 if ( adler !=
event_->adler32() ) {
522 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
523 " but calculated 0x" << adler;
540 catch (
const boost::filesystem::filesystem_error& ex)
542 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
543 <<
". Trying again.";
552 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
553 <<
". Trying again.";
600 bool fileIsBeingProcessed =
false;
603 fileIsBeingProcessed =
true;
607 if (!fileIsBeingProcessed) {
624 uint32_t eventSize =
event_->eventSize();
625 char*
event = (
char*)
event_->payload();
628 while (eventSize > 0) {
629 eventSize -=
sizeof(
fedt_t);
632 eventSize -= (fedSize -
sizeof(
fedh_t));
643 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
651 FEDRawData& fedData = rawData->FEDData(fedId);
653 memcpy(fedData.
data(),
event + eventSize, fedSize);
655 assert(eventSize == 0);
668 std::ostringstream fileNameWithPID;
669 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
670 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
671 jsonDestPath /= fileNameWithPID.str();
673 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
682 catch (
const boost::filesystem::filesystem_error& ex)
685 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
697 catch (
const boost::filesystem::filesystem_error& ex)
700 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
705 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
710 boost::filesystem::ifstream ij(jsonDestPath);
714 if (!reader.
parse(ij, deserializeRoot))
715 throw std::runtime_error(
"Cannot deserialize input JSON file");
733 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
734 " error reading number of events from BU JSON -: No input value " <<
data;
736 return boost::lexical_cast<
int>(
data);
739 catch (
const boost::filesystem::filesystem_error& ex)
743 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
744 <<
" - Maybe the BU run dir disappeared? Ending process with code 0...";
747 catch (std::runtime_error
e)
751 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - runtime Exception -: " << e.what();
754 catch( boost::bad_lexical_cast
const& ) {
755 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - error parsing number of events from BU JSON. "
756 <<
"Input value is -: " <<
data;
763 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
774 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
775 <<
" to: " << destination.string();
776 boost::filesystem::rename(source,destination);
777 boost::filesystem::rename(source.replace_extension(
".jsn"),destination.replace_extension(
".jsn"));
785 InputSource::rewind();
798 unsigned int currentLumiSection = 0;
812 std::unique_lock<std::mutex> lkw(
mWakeup_);
814 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
817 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
852 currentLumiSection =
ls;
857 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
864 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
869 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
873 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
876 stat(rawFile.c_str(),&st);
881 assert( eventsInNewFile>=0 );
882 assert((eventsInNewFile>0) == (fileSize>0));
889 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
892 for (
unsigned int i=0;
i<neededChunks;
i++) {
895 unsigned int newTid = 0xffffffff;
906 if (newChunk ==
nullptr) {
913 std::unique_lock<std::mutex> lk(
mReader_);
916 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
917 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
927 if (!eventsInNewFile) {
929 std::unique_lock<std::mutex> lkw(
mWakeup_);
930 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
938 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
940 std::unique_lock<std::mutex> lkw(
mWakeup_);
944 newChunk->
reset(0,toRead,0);
948 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
949 newInputFile->
chunks_[0]=newChunk;
956 unsigned numFinishedThreads = 0;
960 std::unique_lock<std::mutex> lk(
mReader_);
963 numFinishedThreads++;
974 threadInit_.exchange(
true,std::memory_order_acquire);
978 std::unique_lock<std::mutex> lk(
mReader_);
1002 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1003 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1006 if (fileDescriptor>=0)
1007 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1011 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1012 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1018 unsigned int bufferLeft = 0;
1032 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1033 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1034 close(fileDescriptor);
1047 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1063 if (currentLeft < size) {
1112 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1115 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1121 uint32_t existingSize = 0;
1136 for (uint32_t
i=0;
i<blockcount;
i++) {
1151 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
unsigned int get(const unsigned char *, bool)
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)