16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 fileNames_(pset.getUntrackedParameter<std::vector<std::
string>> (
"fileNames",std::vector<std::
string>())),
67 fileListMode_(pset.getUntrackedParameter<bool> (
"fileListMode",
false)),
68 fileListLoopMode_(pset.getUntrackedParameter<bool> (
"fileListLoopMode",
false)),
70 fuOutputDir_(std::
string()),
74 currentLumiSection_(0),
80 gethostname(thishost, 255);
81 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
83 <<
" MB on host " << thishost;
85 long autoRunNumber = -1;
90 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
103 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
106 if (stat(
defPath_.c_str(), &statbuf)) {
108 if (stat(
defPath_.c_str(), &statbuf)) {
115 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
124 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
125 "no reading enabled with numBuffers parameter 0";
132 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
143 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
149 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
172 cvReader_.push_back(
new std::condition_variable);
174 threadInit_.store(
false,std::memory_order_release);
197 std::unique_lock<std::mutex> lk(
mReader_);
218 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
219 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
220 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
221 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
222 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
223 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
224 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
225 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
226 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
227 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
229 descriptions.
add(
"source", desc);
258 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
276 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
314 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
315 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
329 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
332 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
345 gettimeofday(&tv, 0);
346 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
351 lumiSection, lsopentime,
357 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
388 std::unique_lock<std::mutex> lkw(
mWakeup_);
402 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
459 <<
" but according to BU JSON there should be "
464 std::unique_lock<std::mutex> lkw(
mWakeup_);
487 "Premature end of input file while reading event header";
507 if (detectedFRDversion_==0) {
508 detectedFRDversion_=*((
uint32*)dataPosition);
509 if (detectedFRDversion_>5)
512 assert(detectedFRDversion_>=1);
520 "Premature end of input file while reading event header";
527 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
528 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
529 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
537 "Premature end of input file while reading event data";
563 unsigned char *dataPosition;
571 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
572 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
573 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
581 "Premature end of input file while reading event data";
615 if ( crc !=
event_->crc32c() ) {
618 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
619 " but calculated 0x" << crc;
624 uint32_t adler = adler32(0
L,Z_NULL,0);
625 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
627 if ( adler !=
event_->adler32() ) {
630 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
631 " but calculated 0x" << adler;
652 catch (
const boost::filesystem::filesystem_error& ex)
654 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
655 <<
". Trying again.";
660 catch (
const boost::filesystem::filesystem_error&) {}
664 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
665 <<
". Trying again.";
724 bool fileIsBeingProcessed =
false;
727 fileIsBeingProcessed =
true;
731 if (!fileIsBeingProcessed) {
750 gettimeofday(&stv,0);
752 time = (time << 32) + stv.tv_usec;
755 uint32_t eventSize =
event_->eventSize();
756 char*
event = (
char*)
event_->payload();
759 while (eventSize > 0) {
761 eventSize -=
sizeof(
fedt_t);
765 eventSize -= (fedSize -
sizeof(
fedt_t));
770 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
783 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
793 memcpy(fedData.
data(),
event + eventSize, fedSize);
808 std::ostringstream fileNameWithPID;
809 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
810 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
811 jsonDestPath /= fileNameWithPID.str();
813 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
818 catch (
const boost::filesystem::filesystem_error& ex)
821 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
832 catch (
const boost::filesystem::filesystem_error& ex)
835 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
840 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
843 boost::filesystem::ifstream ij(jsonDestPath);
847 std::stringstream
ss;
849 if (!reader.
parse(ss.str(), deserializeRoot)) {
850 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
852 <<
"CONTENT:\n" << ss.str()<<
".";
853 throw std::runtime_error(
"Cannot deserialize input JSON file");
872 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
873 " error reading number of events from BU JSON -: No input value " <<
data;
875 return boost::lexical_cast<
int>(
data);
878 catch (
const boost::filesystem::filesystem_error& ex)
882 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
884 catch (std::runtime_error
e)
888 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
891 catch( boost::bad_lexical_cast
const& ) {
892 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
893 <<
"Input value is -: " <<
data;
900 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
911 InputSource::rewind();
924 unsigned int currentLumiSection = 0;
934 uint32_t lockCount=0;
945 bool copy_active=
false;
961 std::unique_lock<std::mutex> lkw(
mWakeup_);
966 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
997 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
1007 if (thisLockWaitTimeUs>0.)
1008 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1015 sumLockWaitTimeUs=0;
1036 currentLumiSection =
ls;
1041 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1051 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1057 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1061 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1064 int stat_res = stat(rawFile.c_str(),&st);
1066 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1071 fileSize=st.st_size;
1078 int eventsInNewFile;
1080 if (fileSize==0) eventsInNewFile=0;
1081 else eventsInNewFile=-1;
1085 assert( eventsInNewFile>=0 );
1086 assert((eventsInNewFile>0) == (fileSize>0));
1100 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1104 for (
unsigned int i=0;
i<neededChunks;
i++) {
1107 bool copy_active=
false;
1115 unsigned int newTid = 0xffffffff;
1121 bool copy_active=
false;
1134 if (newChunk ==
nullptr) {
1142 std::unique_lock<std::mutex> lk(
mReader_);
1145 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1146 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1156 if (!eventsInNewFile) {
1158 std::unique_lock<std::mutex> lkw(
mWakeup_);
1159 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1168 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1170 std::unique_lock<std::mutex> lkw(
mWakeup_);
1174 newChunk->
reset(0,toRead,0);
1178 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1179 newInputFile->
chunks_[0]=newChunk;
1188 unsigned numFinishedThreads = 0;
1191 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1192 std::unique_lock<std::mutex> lk(
mReader_);
1195 numFinishedThreads++;
1206 threadInit_.exchange(
true,std::memory_order_acquire);
1211 std::unique_lock<std::mutex> lk(
mReader_);
1236 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1237 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1240 if (fileDescriptor>=0)
1241 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1245 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1246 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1252 unsigned int bufferLeft = 0;
1266 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1267 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1268 close(fileDescriptor);
1281 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1297 if (currentLeft < size) {
1346 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1349 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1355 uint32_t existingSize = 0;
1370 for (uint32_t
i=0;
i<blockcount;
i++) {
1384 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1398 itr->second+=events;
1408 auto &&
ret = std::pair<bool,unsigned int>(
true,itr->second);
1414 return std::pair<bool,unsigned int>(
false,0);
1421 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1422 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1429 auto end = fileStem.find(
"_");
1430 if (fileStem.find(
"run")==0) {
1434 long rval = boost::lexical_cast<
long>(runStr);
1435 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1438 catch( boost::bad_lexical_cast
const& ) {
1439 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1450 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1451 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1454 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1455 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1458 ls = boost::lexical_cast<
unsigned int>(fileStem);
1474 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
void setInStateSup(FastMonitoringThread::InputState inputState)
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const